!pip install ipython==7.22.0
!pip install boto3
## Importing all necessary libraries to run code in this block.
import findspark
import json
import boto3
import time
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from time import sleep
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import col, udf, desc
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, OneHotEncoder, StringIndexer, VectorAssembler, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
## Initiating spark
findspark.init()
conf = (SparkConf().set("spark.driver.maxResultSize", "3g"))
sc = SparkContext(conf=conf)
spark = SparkSession.builder.appName("streaming").getOrCreate()
sc
spark
## Preparing the schema for the data
data = spark.read.json("s3://bigdatateaching/reddit/sample-data/1m-line-sample.json")
schema = data.schema.json()
new_schema = StructType.fromJson(json.loads(schema))
## Getting a list of all required files to be read in
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('bigdatateaching')
files = []
for obj in my_bucket.objects.filter(Prefix='reddit/lzo/', Delimiter='/'):
files.append(obj.key)
files = [file for file in files if not any(remove in file for remove in ["index"])]
files = ['s3://bigdatateaching/' + file for file in files]
files
#start = time.time() ## Logging time it take to read files
## Reading in all files
## Commenting it out because saved as parquet file later and read it back
#posts_rdd = sc.newAPIHadoopFile("s3://bigdatateaching/reddit/lzo/RC_*.lzo",
#'com.hadoop.mapreduce.LzoTextInputFormat',
#'org.apache.hadoop.io.LongWritable',
#'org.apache.hadoop.io.Text')
## Applying the schema and converting to dataframe
#posts_df_append = posts_rdd.map(lambda x:Row(x[1])).toDF(['raw']).select(F.from_json('raw',new_schema).alias('json')).select('json.*')
#end = time.time() ## Logging time it take to read files
#print ("Time elapsed:", end - start) ## Logging time it take to read files
## Saving pyspark dataframe to parquet file
#posts_df_append.write.parquet("s3://mdf-final-project-dhg32/full_data.parquet",mode="overwrite")
## Reading the saved parquet file of the dataset
posts_df_append = spark.read.parquet("s3://mdf-final-project-dhg32/full_data.parquet")
## Sample file
#posts_rdd = sc.newAPIHadoopFile("s3://bigdatateaching/reddit/sample-data/1m-line-sample.json.lzo",
# 'com.hadoop.mapreduce.LzoTextInputFormat',
# 'org.apache.hadoop.io.LongWritable',
# 'org.apache.hadoop.io.Text')
#
#posts_df_append = posts_rdd.map(lambda x:Row(x[1])).toDF(['raw']).select(F.from_json('raw',new_schema).alias('json')).select('json.*')
## Keeping data in memory to use
#posts_df_append.cache()
## Printing the schema
posts_df_append.printSchema()
print("The total comments in these datasets are:",posts_df_append.count())
sub_count_mr = posts_df_append.rdd.map(lambda comment: (comment['subreddit_name_prefixed'], 1)).reduceByKey(lambda accum, n: accum + n).collect()
print("====================================================\n\n")
print("We have the total number of sub-reddits mentioned in these datasets along with the\nsub-reddit with most comments in these datasets as follows:\n\n")
print("There are a total of",len(sub_count_mr),"subreddits.\n\n")
print("The top-20 most comments are from:\n")
for key,value in sorted(sub_count_mr, key=lambda sub_cnt: sub_cnt[1], reverse=True)[0:20]:
print("The sub-reddit", key, "has", value, "comments.\n")
print("\n====================================================")
For the purpose of this project, I want to analyze if the words used in a post/comment govern the sentiment toward the post. I want to know if the choice of words in a post makes other people upvote or downvote it rather than the post itself. For this purpose, I will use the score column and turn it into a categorical variable. Scores less than 1 would count as a negative sentiment toward the post, scores equal to 1 as a neutral sentiment and scores greater than 1 as a positive sentitment. The reason for scores equal 1 being chosen as a neutral sentiment is because comments have a score of 1 by default as they're posted. I will use this as my target variable, and a bag or words made from the 'body' variable as my features.
I also randomly split my data into two parts of 5% and 95% to work on a smaller dataset of about 21.8M comments instead of all the 476M comments.
## Getting rid of removed, deleted and empty comments.
posts_df_append = posts_df_append.filter(posts_df_append.body != "[removed]")
posts_df_append = posts_df_append.filter(posts_df_append.body != "[deleted]")
posts_df_append = posts_df_append.filter(posts_df_append.body != "")
## Getting rid of subreddit_type = user
posts_df_append = posts_df_append.filter(posts_df_append.subreddit_type != "user")
## Dropping unnecessary columns to reduce clutter
columns_to_drop = ['author_cakeday', 'author_created_utc', 'author_flair_background_color', 'author_flair_css_class',
'author_flair_richtext', 'author_flair_template_id', 'author_flair_text_color', 'id', 'gildings',
'author_flair_type', 'author_patreon_flair', 'subreddit_id', 'subreddit', 'retrieved_on',
'permalink', 'parent_id', 'link_id', 'created_utc', 'author_fullname', 'send_replies',
'no_follow', 'can_mod_post', 'author_flair_text', 'can_gild', 'collapsed_reason',
'edited', 'removal_reason', 'author', 'archived', 'collapsed', 'archived', 'author',
'collapsed', 'controversiality', 'distinguished', 'gilded', 'is_submitter',
'stickied', 'subreddit_type']
df = posts_df_append.drop(*columns_to_drop)
## Splitting the data to 5% of the total data for use in this project
df, df_95 = df.randomSplit([0.05,0.95], seed = 10141997)
## Caching for faster processing in memory
#df.cache()
df.show()
print("The total comments in my subsetted dataset are:",df.count())
print("\n\nI will be splitting this dataset into train, and test sets.")
## Making a new column called sentiment which is categorical with 3 categories: Negative, Neutral, and Positive.
## This will be our target variable for analysis.
df = df.withColumn('sentiment',
F.when(F.col('score') < 1, "Negative").when(F.col('score') == 1, "Neutral").when(F.col('score') > 1, "Positive"))
## Dropping all duplicate body entires in the dataset and only keeping the latest onesb
df = df.select(['body', 'sentiment', 'subreddit_name_prefixed']).dropDuplicates(['body'])
print("====================================================\n\n")
for y in ['sentiment', 'body', 'subreddit_name_prefixed']:
print("The variable",
y,
"has",
df.select(y).distinct().rdd.map(lambda r: r[0]).count(),
"unique values.\n\n",
"Example of observations:\n",
df.select(y).distinct().rdd.map(lambda r: r[0]).take(3),
"\n")
print("\n====================================================")
df.describe().show()
df.printSchema()
print("====================================================\n\n")
print("The 3 classes of the target variable have the following observations:\n\n")
df.groupby(['sentiment']).count().orderBy(desc('count')).show()
print("\n====================================================")
## Bar chart denoting distribution of target variable.
sns.set_theme(style = "whitegrid")
ax = sns.barplot(x = "sentiment", y = "count", data = df.groupby(['sentiment']).count().orderBy(desc('count')).toPandas())
ax.set(xlabel = "Sentiment", ylabel = "Number of Comments", title = "Distribution of the Target Variable (Sentiment)")
df.show(10)
## Most negative subreddit
print("The most negatively associated (more downvotes on comments than upvotes overall) sub-reddit in the dataset is ", df.groupby(['subreddit_name_prefixed', 'sentiment']).count().sort('count').filter(df.sentiment == "Negative").collect()[-1]['subreddit_name_prefixed'])
## Most positive subreddit
print("\nThe most positively associated (more upvotes on comments than downvotes overall) sub-reddit in the dataset is ", df.groupby(['subreddit_name_prefixed', 'sentiment']).count().sort('count').filter(df.sentiment == "Positive").collect()[-1]['subreddit_name_prefixed'])
## Making a feature matrix with the text
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="body", outputCol="words", pattern="\\W")
# stop words
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered")
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", minDF=5, vocabSize=10000)
## encoding target variable
label_stringIdx = StringIndexer(inputCol = "sentiment", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
# Fit the pipeline to data.
pipelineFit = pipeline.fit(df)
dataset = pipelineFit.transform(df)
dataset.show(5)
## Splitting dataset into train, and test sets
(train_set, test_set) = dataset.randomSplit([0.90, 0.10], seed = 10141997)
## Logistic Regression
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
## Fitting training data on model
lrModel = lr.fit(train_set)
## Getting set of predictions
predictions = lrModel.transform(test_set)
predictions.filter(predictions['prediction'] == 0).select("body","sentiment",'subreddit_name_prefixed',"probability","label","prediction").orderBy("probability", ascending=False).show(n = 10, truncate = 30)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.3, 0.5]).addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]).build())
# 5-fold Cross validation
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator = evaluator, numFolds=5)
cvModel = cv.fit(train_set)
predictions = cvModel.transform(test_set)
predictions.filter(predictions['prediction'] == 0).select("body","sentiment",'subreddit_name_prefixed',"probability","label","prediction").orderBy("probability", ascending=False).show(n = 10, truncate = 30)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)
## Getting term-frequencies
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
## Getting inverse document frequencies
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])
# Fit the pipeline to data.
pipelineFit = pipeline.fit(df)
dataset = pipelineFit.transform(df)
## Splitting dataset into train, and test sets
(train_set, test_set) = dataset.randomSplit([0.90, 0.10], seed = 10141997)
## Logistic Regression
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
## Fitting training data on model
lrModel = lr.fit(train_set)
predictions = lrModel.transform(test_set)
predictions.filter(predictions['prediction'] == 0).select("body","sentiment",'subreddit_name_prefixed',"probability","label","prediction").orderBy("probability", ascending=False).show(n = 10, truncate = 30)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)
sc.stop()