PPOL 567 Massive Data Analytics Big Data Project

McCourt School of Public Policy, Georgetown University, Spring 2021

Submitted by: Digvijay Ghotane, dhg32

In [1]:
!pip install ipython==7.22.0
Requirement already satisfied: ipython==7.22.0 in ./miniconda/lib/python3.7/site-packages (7.22.0)
Requirement already satisfied: pickleshare in ./miniconda/lib/python3.7/site-packages (from ipython==7.22.0) (0.7.5)
Requirement already satisfied: decorator in ./miniconda/lib/python3.7/site-packages (from ipython==7.22.0) (5.0.6)
Requirement already satisfied: jedi>=0.16 in ./miniconda/lib/python3.7/site-packages (from ipython==7.22.0) (0.18.0)
Requirement already satisfied: pygments in ./miniconda/lib/python3.7/site-packages (from ipython==7.22.0) (2.8.1)
Requirement already satisfied: backcall in ./miniconda/lib/python3.7/site-packages (from ipython==7.22.0) (0.2.0)
Requirement already satisfied: traitlets>=4.2 in ./miniconda/lib/python3.7/site-packages (from ipython==7.22.0) (5.0.5)
Requirement already satisfied: setuptools>=18.5 in ./miniconda/lib/python3.7/site-packages (from ipython==7.22.0) (52.0.0.post20210125)
Requirement already satisfied: prompt-toolkit!=3.0.0,!=3.0.1,<3.1.0,>=2.0.0 in ./miniconda/lib/python3.7/site-packages (from ipython==7.22.0) (3.0.17)
Requirement already satisfied: pexpect>4.3 in ./miniconda/lib/python3.7/site-packages (from ipython==7.22.0) (4.8.0)
Requirement already satisfied: parso<0.9.0,>=0.8.0 in ./miniconda/lib/python3.7/site-packages (from jedi>=0.16->ipython==7.22.0) (0.8.2)
Requirement already satisfied: ptyprocess>=0.5 in ./miniconda/lib/python3.7/site-packages (from pexpect>4.3->ipython==7.22.0) (0.7.0)
Requirement already satisfied: wcwidth in ./miniconda/lib/python3.7/site-packages (from prompt-toolkit!=3.0.0,!=3.0.1,<3.1.0,>=2.0.0->ipython==7.22.0) (0.2.5)
Requirement already satisfied: ipython-genutils in ./miniconda/lib/python3.7/site-packages (from traitlets>=4.2->ipython==7.22.0) (0.2.0)
In [2]:
!pip install boto3
Requirement already satisfied: boto3 in ./miniconda/lib/python3.7/site-packages (1.17.67)
Requirement already satisfied: s3transfer<0.5.0,>=0.4.0 in ./miniconda/lib/python3.7/site-packages (from boto3) (0.4.2)
Requirement already satisfied: botocore<1.21.0,>=1.20.67 in ./miniconda/lib/python3.7/site-packages (from boto3) (1.20.67)
Requirement already satisfied: jmespath<1.0.0,>=0.7.1 in ./miniconda/lib/python3.7/site-packages (from boto3) (0.10.0)
Requirement already satisfied: urllib3<1.27,>=1.25.4 in ./miniconda/lib/python3.7/site-packages (from botocore<1.21.0,>=1.20.67->boto3) (1.26.4)
Requirement already satisfied: python-dateutil<3.0.0,>=2.1 in ./miniconda/lib/python3.7/site-packages (from botocore<1.21.0,>=1.20.67->boto3) (2.8.1)
Requirement already satisfied: six>=1.5 in ./miniconda/lib/python3.7/site-packages (from python-dateutil<3.0.0,>=2.1->botocore<1.21.0,>=1.20.67->boto3) (1.15.0)
In [3]:
## Importing all necessary libraries to run code in this block.
import findspark
import json
import boto3
import time
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from time import sleep
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import col, udf, desc
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, OneHotEncoder, StringIndexer, VectorAssembler, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
In [4]:
## Initiating spark

findspark.init()
conf = (SparkConf().set("spark.driver.maxResultSize", "3g"))
sc = SparkContext(conf=conf)
spark = SparkSession.builder.appName("streaming").getOrCreate()

sc
Out[4]:

SparkContext

Spark UI

Version
v2.4.4
Master
yarn
AppName
pyspark-shell
In [5]:
spark
Out[5]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v2.4.4
Master
yarn
AppName
pyspark-shell
In [6]:
## Preparing the schema for the data

data = spark.read.json("s3://bigdatateaching/reddit/sample-data/1m-line-sample.json")

schema = data.schema.json()

new_schema = StructType.fromJson(json.loads(schema))
In [7]:
## Getting a list of all required files to be read in

s3 = boto3.resource('s3')

my_bucket = s3.Bucket('bigdatateaching')

files = []

for obj in my_bucket.objects.filter(Prefix='reddit/lzo/', Delimiter='/'):
    files.append(obj.key)

files = [file for file in files if not any(remove in file for remove in ["index"])]

files = ['s3://bigdatateaching/' + file for file in files]

files
Out[7]:
['s3://bigdatateaching/reddit/lzo/RC_2018-10.lzo',
 's3://bigdatateaching/reddit/lzo/RC_2018-11.lzo',
 's3://bigdatateaching/reddit/lzo/RC_2018-12.lzo',
 's3://bigdatateaching/reddit/lzo/RC_2019-01.lzo']
In [8]:
#start = time.time() ## Logging time it take to read files

## Reading in all files
## Commenting it out because saved as parquet file later and read it back
#posts_rdd = sc.newAPIHadoopFile("s3://bigdatateaching/reddit/lzo/RC_*.lzo",
                                #'com.hadoop.mapreduce.LzoTextInputFormat',
                                #'org.apache.hadoop.io.LongWritable',
                                #'org.apache.hadoop.io.Text')

## Applying the schema and converting to dataframe
#posts_df_append = posts_rdd.map(lambda x:Row(x[1])).toDF(['raw']).select(F.from_json('raw',new_schema).alias('json')).select('json.*')

#end = time.time() ## Logging time it take to read files
#print ("Time elapsed:", end - start) ## Logging time it take to read files
In [9]:
## Saving pyspark dataframe to parquet file

#posts_df_append.write.parquet("s3://mdf-final-project-dhg32/full_data.parquet",mode="overwrite")
In [10]:
## Reading the saved parquet file of the dataset
posts_df_append = spark.read.parquet("s3://mdf-final-project-dhg32/full_data.parquet")
In [11]:
## Sample file
#posts_rdd = sc.newAPIHadoopFile("s3://bigdatateaching/reddit/sample-data/1m-line-sample.json.lzo",
#                                    'com.hadoop.mapreduce.LzoTextInputFormat',
#                                    'org.apache.hadoop.io.LongWritable',
#                                    'org.apache.hadoop.io.Text')
#    
#posts_df_append = posts_rdd.map(lambda x:Row(x[1])).toDF(['raw']).select(F.from_json('raw',new_schema).alias('json')).select('json.*')
In [12]:
## Keeping data in memory to use

#posts_df_append.cache()
In [13]:
## Printing the schema

posts_df_append.printSchema()
root
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_created_utc: long (nullable = true)
 |-- author_flair_background_color: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_richtext: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- e: string (nullable = true)
 |    |    |-- t: string (nullable = true)
 |    |    |-- u: string (nullable = true)
 |-- author_flair_template_id: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- author_flair_text_color: string (nullable = true)
 |-- author_flair_type: string (nullable = true)
 |-- author_fullname: string (nullable = true)
 |-- author_patreon_flair: boolean (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- can_mod_post: boolean (nullable = true)
 |-- collapsed: boolean (nullable = true)
 |-- collapsed_reason: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: long (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- gildings: struct (nullable = true)
 |    |-- gid_1: long (nullable = true)
 |    |-- gid_2: long (nullable = true)
 |    |-- gid_3: long (nullable = true)
 |-- id: string (nullable = true)
 |-- is_submitter: boolean (nullable = true)
 |-- link_id: string (nullable = true)
 |-- no_follow: boolean (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- removal_reason: string (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- score: long (nullable = true)
 |-- send_replies: boolean (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- subreddit_name_prefixed: string (nullable = true)
 |-- subreddit_type: string (nullable = true)

Exploratory Data Analysis

Total Number of Comments

In [14]:
print("The total comments in these datasets are:",posts_df_append.count())
The total comments in these datasets are: 476570966

Total number of sub-reddits and comments from sub-reddits

In [15]:
sub_count_mr = posts_df_append.rdd.map(lambda comment: (comment['subreddit_name_prefixed'], 1)).reduceByKey(lambda accum, n: accum + n).collect()
print("====================================================\n\n")
print("We have the total number of sub-reddits mentioned in these datasets along with the\nsub-reddit with most comments in these datasets as follows:\n\n")
print("There are a total of",len(sub_count_mr),"subreddits.\n\n")
print("The top-20 most comments are from:\n")
for key,value in sorted(sub_count_mr, key=lambda sub_cnt: sub_cnt[1], reverse=True)[0:20]:
    print("The sub-reddit", key, "has", value, "comments.\n")
print("\n====================================================")
====================================================


We have the total number of sub-reddits mentioned in these datasets along with the
sub-reddit with most comments in these datasets as follows:


There are a total of 233505 subreddits.


The top-20 most comments are from:

The sub-reddit r/AskReddit has 24442910 comments.

The sub-reddit r/politics has 8982956 comments.

The sub-reddit r/nfl has 5580920 comments.

The sub-reddit r/funny has 4283669 comments.

The sub-reddit r/nba has 4123484 comments.

The sub-reddit r/The_Donald has 3953808 comments.

The sub-reddit r/worldnews has 3453986 comments.

The sub-reddit r/CFB has 3226309 comments.

The sub-reddit r/gaming has 3192611 comments.

The sub-reddit r/FortNiteBR has 3084633 comments.

The sub-reddit r/dankmemes has 3042181 comments.

The sub-reddit r/AskOuija has 3008621 comments.

The sub-reddit r/news has 2883063 comments.

The sub-reddit r/pics has 2790086 comments.

The sub-reddit r/memes has 2598956 comments.

The sub-reddit r/soccer has 2494466 comments.

The sub-reddit r/leagueoflegends has 2460779 comments.

The sub-reddit r/Showerthoughts has 2317012 comments.

The sub-reddit r/reddeadredemption has 2294665 comments.

The sub-reddit r/unpopularopinion has 2250682 comments.


====================================================

Project Direction

For the purpose of this project, I want to analyze if the words used in a post/comment govern the sentiment toward the post. I want to know if the choice of words in a post makes other people upvote or downvote it rather than the post itself. For this purpose, I will use the score column and turn it into a categorical variable. Scores less than 1 would count as a negative sentiment toward the post, scores equal to 1 as a neutral sentiment and scores greater than 1 as a positive sentitment. The reason for scores equal 1 being chosen as a neutral sentiment is because comments have a score of 1 by default as they're posted. I will use this as my target variable, and a bag or words made from the 'body' variable as my features.

I also randomly split my data into two parts of 5% and 95% to work on a smaller dataset of about 21.8M comments instead of all the 476M comments.

Data Cleaning

In [17]:
## Getting rid of removed, deleted and empty comments.

posts_df_append = posts_df_append.filter(posts_df_append.body != "[removed]")
posts_df_append = posts_df_append.filter(posts_df_append.body != "[deleted]")
posts_df_append = posts_df_append.filter(posts_df_append.body != "")

## Getting rid of subreddit_type = user

posts_df_append = posts_df_append.filter(posts_df_append.subreddit_type != "user")
In [18]:
## Dropping unnecessary columns to reduce clutter

columns_to_drop = ['author_cakeday', 'author_created_utc', 'author_flair_background_color', 'author_flair_css_class',
                  'author_flair_richtext', 'author_flair_template_id', 'author_flair_text_color', 'id', 'gildings',
                  'author_flair_type', 'author_patreon_flair', 'subreddit_id', 'subreddit', 'retrieved_on',
                  'permalink', 'parent_id', 'link_id', 'created_utc', 'author_fullname', 'send_replies',
                   'no_follow', 'can_mod_post', 'author_flair_text', 'can_gild', 'collapsed_reason',
                  'edited', 'removal_reason', 'author', 'archived', 'collapsed', 'archived', 'author',
                  'collapsed', 'controversiality', 'distinguished', 'gilded', 'is_submitter',
                  'stickied', 'subreddit_type']

df = posts_df_append.drop(*columns_to_drop)
In [19]:
## Splitting the data to 5% of the total data for use in this project

df, df_95 = df.randomSplit([0.05,0.95], seed = 10141997)
In [20]:
## Caching for faster processing in memory

#df.cache()
In [21]:
df.show()
+--------------------+-----+-----------------------+
|                body|score|subreddit_name_prefixed|
+--------------------+-----+-----------------------+
|	extern crate fut...|    1|                 r/rust|
|

&gt;jay has had...|   13|                 r/kpop|
|

Ako.. :-( kaso ...|    2|          r/Philippines|
|      

Dahmercrats.|    7|           r/The_Donald|
|

Fair argument b...|    2|        r/gameofthrones|
|

Snapshots:

1. ...|    1|     r/Gamingcirclejerk|
|

Snapshots:

1. ...|    2|     r/TopMindsOfReddit|
|

Snapshots:

1. ...|    2|    r/jesuschristreddit|
|

Snapshots:

1. ...|    2|   r/ShitRConservati...|
|

Snapshots:

1. ...|    1|     r/Gamingcirclejerk|
|

Thank you for m...|   -1|             r/thinkpad|
|
    public Light...|    9|              r/Unity3D|
|
#**QUICK FAQ**
#...|    1|             r/Sexsells|
|
#Welcome to r/Fa...|    2|       r/FashionRepsBST|
|
#[Make sure you ...|    1|              r/forhire|
|
&gt;A few weeks ...|    9|             r/Teachers|
|
&gt;Two Supreme ...|    2|        r/AdviceAnimals|
|
&gt;\*\*EDIT 2: ...|    8|                 r/IAmA|
|
* [Search for Da...|    1|            r/Cash4Cash|
|
**  **
/u/stabbo...|    1|              r/stabbot|
+--------------------+-----+-----------------------+
only showing top 20 rows

In [22]:
print("The total comments in my subsetted dataset are:",df.count())
print("\n\nI will be splitting this dataset into train, and test sets.")
The total comments in my subsetted dataset are: 21894699


I will be splitting this dataset into train, and test sets.
In [23]:
## Making a new column called sentiment which is categorical with 3 categories: Negative, Neutral, and Positive.
## This will be our target variable for analysis.

df = df.withColumn('sentiment', 
              F.when(F.col('score') < 1, "Negative").when(F.col('score') == 1, "Neutral").when(F.col('score') > 1, "Positive"))
In [24]:
## Dropping all duplicate body entires in the dataset and only keeping the latest onesb

df = df.select(['body', 'sentiment', 'subreddit_name_prefixed']).dropDuplicates(['body'])
In [31]:
print("====================================================\n\n")

for y in ['sentiment', 'body', 'subreddit_name_prefixed']:
        print("The variable",
              y,
              "has",
              df.select(y).distinct().rdd.map(lambda r: r[0]).count(),
              "unique values.\n\n",
              "Example of observations:\n",
              df.select(y).distinct().rdd.map(lambda r: r[0]).take(3),
              "\n")

print("\n====================================================")
====================================================


The variable sentiment has 3 unique values.

 Example of observations:
 ['Positive', 'Neutral', 'Negative'] 

The variable body has 20024966 unique values.

 Example of observations:
 [' Your post has been removed as it violates Reddit sitewide rules. No asking for upvotes. Read more [here](https://www.reddit.com/help/contentpolicy#section_prohibited_behavior)\n\n*I am a bot, and this action was performed automatically. Please [contact the moderators of this subreddit](/message/compose/?to=/r/PewdiepieSubmissions) if you have any questions or concerns.*', '"Here\'s my off the cuff response without knowing anything about him" he could be terrible but seems like there is more to him besides his latest position. ', '"Wow, why so glum, guys? Don\'t you realize how grateful you should be? When I was a kid, I was thankful to have food on the table, clothes on my back, and a roof over my head. Looks like you got all that, so what are you complaining about?"\n\n"Nazis are hunting down and systematically killing or imprisoning all Jews."\n\n"Psh. You\'re hidden! And there\'s nothing you can do about that right now, so don\'t bother worrying about it!"'] 

The variable subreddit_name_prefixed has 68832 unique values.

 Example of observations:
 ['r/trees', 'r/2healthbars', 'r/doctorwho'] 


====================================================
In [32]:
df.describe().show()
+-------+--------------------+---------+-----------------------+
|summary|                body|sentiment|subreddit_name_prefixed|
+-------+--------------------+---------+-----------------------+
|  count|            20024966| 20024966|               20024966|
|   mean|                 NaN|     null|                   null|
| stddev|                 NaN|     null|                   null|
|    min|Good link, you a...| Negative|                  r/007|
|    max|               𨳒杘?| Positive|                  r/zzt|
+-------+--------------------+---------+-----------------------+

In [33]:
df.printSchema()
root
 |-- body: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- subreddit_name_prefixed: string (nullable = true)

In [34]:
print("====================================================\n\n")

print("The 3 classes of the target variable have the following observations:\n\n")
df.groupby(['sentiment']).count().orderBy(desc('count')).show()

print("\n====================================================")
====================================================


The 3 classes of the target variable have the following observations:


+---------+--------+
|sentiment|   count|
+---------+--------+
| Positive|11536391|
|  Neutral| 7035137|
| Negative| 1453438|
+---------+--------+


====================================================
In [35]:
## Bar chart denoting distribution of target variable.

sns.set_theme(style = "whitegrid")
ax = sns.barplot(x = "sentiment", y = "count", data = df.groupby(['sentiment']).count().orderBy(desc('count')).toPandas())
ax.set(xlabel = "Sentiment", ylabel = "Number of Comments", title = "Distribution of the Target Variable (Sentiment)")
/home/hadoop/miniconda/lib/python3.7/site-packages/pyspark/sql/dataframe.py:2102: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true; however, failed by the reason below:
  PyArrow >= 0.8.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.
  warnings.warn(msg)
Out[35]:
[Text(0.5, 0, 'Sentiment'),
 Text(0, 0.5, 'Number of Comments'),
 Text(0.5, 1.0, 'Distribution of the Target Variable (Sentiment)')]
In [36]:
df.show(10)
+--------------------+---------+-----------------------+
|                body|sentiment|subreddit_name_prefixed|
+--------------------+---------+-----------------------+
|          




CHUNG|  Neutral|                r/memes|
|


&gt; Yes it do...|  Neutral|       r/QuakeChampions|
|


[***First Impr...| Positive|           r/FanFiction|
|


[***Made of Co...|  Neutral|         r/HPfanfiction|
|


[***Say a Pray...|  Neutral|         r/HPfanfiction|
|


[***When Harry...| Positive|         r/HPfanfiction|
|


[***insurgere*...|  Neutral|         r/HPfanfiction|
|

 I really feel ...| Negative|      r/leagueoflegends|
|

**Boku-tachi wa...| Positive|            r/anime_irl|
|

**Flying Witch*...|  Neutral|                r/manga|
+--------------------+---------+-----------------------+
only showing top 10 rows

In [37]:
## Most negative subreddit

print("The most negatively associated (more downvotes on comments than upvotes overall) sub-reddit in the dataset is ", df.groupby(['subreddit_name_prefixed', 'sentiment']).count().sort('count').filter(df.sentiment == "Negative").collect()[-1]['subreddit_name_prefixed'])

## Most positive subreddit

print("\nThe most positively associated (more upvotes on comments than downvotes overall) sub-reddit in the dataset is ", df.groupby(['subreddit_name_prefixed', 'sentiment']).count().sort('count').filter(df.sentiment == "Positive").collect()[-1]['subreddit_name_prefixed'])
The most negatively associated (more downvotes on comments than upvotes overall) sub-reddit in the dataset is  r/AskReddit

The most positively associated (more upvotes on comments than downvotes overall) sub-reddit in the dataset is  r/AskReddit
In [38]:
## Making a feature matrix with the text

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="body", outputCol="words", pattern="\\W")

# stop words
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered")

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", minDF=5, vocabSize=10000)
In [39]:
## encoding target variable
label_stringIdx = StringIndexer(inputCol = "sentiment", outputCol = "label")

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
In [40]:
# Fit the pipeline to data.
pipelineFit = pipeline.fit(df)
dataset = pipelineFit.transform(df)
dataset.show(5)
+--------------------+---------+-----------------------+--------------------+--------------------+--------------------+-----+
|                body|sentiment|subreddit_name_prefixed|               words|            filtered|            features|label|
+--------------------+---------+-----------------------+--------------------+--------------------+--------------------+-----+
|          




CHUNG|  Neutral|                r/memes|             [chung]|             [chung]|       (10000,[],[])|  1.0|
|


&gt; Yes it do...|  Neutral|       r/QuakeChampions|[gt, yes, it, doe...|[gt, yes, alterna...|(10000,[2,16,101,...|  1.0|
|


[***First Impr...| Positive|           r/FanFiction|[first, impressio...|[first, impressio...|(10000,[6,7,15,16...|  0.0|
|


[***Made of Co...|  Neutral|         r/HPfanfiction|[made, of, common...|[made, common, cl...|(10000,[1,6,7,15,...|  1.0|
|


[***Say a Pray...|  Neutral|         r/HPfanfiction|[say, a, prayer, ...|[say, prayer, htt...|(10000,[6,7,16,29...|  1.0|
+--------------------+---------+-----------------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows

In [41]:
## Splitting dataset into train, and test sets

(train_set, test_set) = dataset.randomSplit([0.90, 0.10], seed = 10141997)

Modelling

Logistic Regression using Count Vectors

In [42]:
## Logistic Regression
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

## Fitting training data on model
lrModel = lr.fit(train_set)

## Getting set of predictions
predictions = lrModel.transform(test_set)
predictions.filter(predictions['prediction'] == 0).select("body","sentiment",'subreddit_name_prefixed',"probability","label","prediction").orderBy("probability", ascending=False).show(n = 10, truncate = 30)
+------------------------------+---------+-----------------------+------------------------------+-----+----------+
|                          body|sentiment|subreddit_name_prefixed|                   probability|label|prediction|
+------------------------------+---------+-----------------------+------------------------------+-----+----------+
|&gt;Angel Angel Angel Angel...| Positive|            r/Deltarune|[1.0,1.028475449158419E-16,...|  0.0|       0.0|
|rain rain rain rain rain ra...| Positive|        r/ELOcirclejerk|[1.0,3.652425173790645E-20,...|  0.0|       0.0|
|Woah woah woah woah woah wo...| Positive|        r/okbuddyretard|[1.0,1.0895410061950017E-54...|  0.0|       0.0|
|hug hug hug hug hug hug hug...| Positive|             r/Animemes|[0.9999999999999956,4.39280...|  0.0|       0.0|
|MEG
Hi... I'm Meg.
JENNIFER...| Positive|        r/okbuddyretard|[0.9999961154511151,3.78732...|  0.0|       0.0|
|Finally getting to this one...| Positive|        r/adventuretime|[0.9999865746677811,2.23658...|  0.0|       0.0|
|**Accused** - A podcast mad...| Positive|  r/UnresolvedMysteries|[0.9999424219976025,5.71804...|  0.0|       0.0|
|#**#1: The Chainsmokers - C...| Positive|             r/popheads|[0.999923404229012,6.498935...|  0.0|       0.0|
|Boy it's great to have more...| Positive|                r/anime|[0.9999108910732277,5.27753...|  0.0|       0.0|
|When I first saw this artic...|  Neutral|          r/aznidentity|[0.9998943404114136,3.55351...|  1.0|       0.0|
+------------------------------+---------+-----------------------+------------------------------+-----+----------+
only showing top 10 rows

Accuracy for LR using Count Vector features

In [43]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)
Out[43]:
0.46050460593368286

Cross Validation

In [44]:
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.3, 0.5]).addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]).build())

#  5-fold Cross validation
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator = evaluator, numFolds=5)

cvModel = cv.fit(train_set)

predictions = cvModel.transform(test_set)
predictions.filter(predictions['prediction'] == 0).select("body","sentiment",'subreddit_name_prefixed',"probability","label","prediction").orderBy("probability", ascending=False).show(n = 10, truncate = 30)
+------------------------------+---------+-----------------------+------------------------------+-----+----------+
|                          body|sentiment|subreddit_name_prefixed|                   probability|label|prediction|
+------------------------------+---------+-----------------------+------------------------------+-----+----------+
|&gt;Angel Angel Angel Angel...| Positive|            r/Deltarune|[1.0,8.89505022109312E-19,1...|  0.0|       0.0|
|hug hug hug hug hug hug hug...| Positive|             r/Animemes|[1.0,1.180980812322745E-19,...|  0.0|       0.0|
|rain rain rain rain rain ra...| Positive|        r/ELOcirclejerk|[1.0,1.4235417241744846E-25...|  0.0|       0.0|
|Woah woah woah woah woah wo...| Positive|        r/okbuddyretard|[1.0,2.995817785420179E-74,...|  0.0|       0.0|
|MEG
Hi... I'm Meg.
JENNIFER...| Positive|        r/okbuddyretard|[0.9999998074823782,1.92442...|  0.0|       0.0|
|Finally getting to this one...| Positive|        r/adventuretime|[0.99999977301404,1.7976459...|  0.0|       0.0|
|Boy it's great to have more...| Positive|                r/anime|[0.9999958288140519,4.04528...|  0.0|       0.0|
|When I first saw this artic...|  Neutral|          r/aznidentity|[0.9999956406954577,3.51163...|  1.0|       0.0|
|#**#1: The Chainsmokers - C...| Positive|             r/popheads|[0.9999953611642894,4.59951...|  0.0|       0.0|
|**Accused** - A podcast mad...| Positive|  r/UnresolvedMysteries|[0.999990304084758,9.695684...|  0.0|       0.0|
+------------------------------+---------+-----------------------+------------------------------+-----+----------+
only showing top 10 rows

Accuracy for LR using Count Vector features with cross validation

In [45]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)
Out[45]:
0.47105161391925154

Logistic Regression using TF-IDF Features

In [46]:
## Getting term-frequencies
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)

## Getting inverse document frequencies
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])
In [47]:
# Fit the pipeline to data.

pipelineFit = pipeline.fit(df)
dataset = pipelineFit.transform(df)
In [48]:
## Splitting dataset into train, and test sets

(train_set, test_set) = dataset.randomSplit([0.90, 0.10], seed = 10141997)
In [50]:
## Logistic Regression
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

## Fitting training data on model
lrModel = lr.fit(train_set)

predictions = lrModel.transform(test_set)

predictions.filter(predictions['prediction'] == 0).select("body","sentiment",'subreddit_name_prefixed',"probability","label","prediction").orderBy("probability", ascending=False).show(n = 10, truncate = 30)
+------------------------------+---------+-----------------------+------------------------------+-----+----------+
|                          body|sentiment|subreddit_name_prefixed|                   probability|label|prediction|
+------------------------------+---------+-----------------------+------------------------------+-----+----------+
|Woah woah woah woah woah wo...| Positive|        r/okbuddyretard|[1.0,2.2908326759537877E-54...|  0.0|       0.0|
|rain rain rain rain rain ra...| Positive|        r/ELOcirclejerk|[0.9999999999999984,1.61366...|  0.0|       0.0|
|Yannick Hendricks, Yannick ...| Positive|                   r/de|[0.999999999999835,3.535354...|  0.0|       0.0|
|hug hug hug hug hug hug hug...| Positive|             r/Animemes|[0.9999999581447112,3.88297...|  0.0|       0.0|
|&gt;Angel Angel Angel Angel...| Positive|            r/Deltarune|[0.9999998554976477,1.44502...|  0.0|       0.0|
|MEG
Hi... I'm Meg.
JENNIFER...| Positive|        r/okbuddyretard|[0.999972504417875,2.211356...|  0.0|       0.0|
|I will advise that doing it...| Positive|           r/FanFiction|[0.9999139903251143,7.52314...|  0.0|       0.0|
|**Accused** - A podcast mad...| Positive|  r/UnresolvedMysteries|[0.9998818819714037,9.06819...|  0.0|       0.0|
|I'd be mortified if people ...| Negative|       r/EnoughMuskSpam|[0.9997285640522175,2.59432...|  2.0|       0.0|
|#**#1: The Chainsmokers - C...| Positive|             r/popheads|[0.9996811465656935,2.80964...|  0.0|       0.0|
+------------------------------+---------+-----------------------+------------------------------+-----+----------+
only showing top 10 rows

Accuracy for LR using TF-IDF features

In [51]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)
Out[51]:
0.4556007764403886
In [ ]:
sc.stop()