Last month in the Plaksha Tech Leaders Fellowship that I am currently pursuing, I began learning about working with big data, and deploying things at scale. In the course, I was introduced to this library called Dask.
Dask is a flexible library for parallel computing in Python. - The first line of its documentation
Contrary to Apache Spark, which was originally written for Java and Scala, Dask is a completely pythonic library which has similar functionality.
Dask has layers to how it operates. At the highest level, where we usually work at, you have arrays, dataframes, machine learning models, and a bags object which is unique to Dask.
Below it there is either Delayed or Futures. Delayed creates a directed acyclic graph (DAG) of all the functions and processes that need to be executed, before running them. Whereas, Futures works similar to how eager execution works in Tensorflow, i.e. things get done as they come in.
At the lowest level is the Scheduler. Now while there are different schedulers, we will be working with the threaded scheduler as we will mainly have arrays and dataframes. The Scheduler is responsible for actually running the DAGs. It can be visualised using the Client, and we will also use that to monitor how it is running.
First, we do our imports and read in the data. For this tutorial, we will be using the Yelp dataset.
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
import seaborn as sns
import dask
import dask.dataframe as dd
import dask.array as da
from dask import delayed
dask.config.set(scheduler='threads')
Here we will start the Client that will enable us to monitor the scheduler
from dask.distributed import Client
client = Client(n_workers=8, threads_per_worker=1, processes=False, scheduler_port=0)
client
If you follow the link in the client, you will see a dashboard that will help you monitor your dask tasks. Once you run a command you should see something similar to this:
All the green means that the tasks were parallelised well and utilised all available hardware efficiently. But if it looks something like this:
This means that all the "workers" (cores of your processor), were not utilised fully.
Another screen is the graph screen, where you can see the DAG of your task.
The blue nodes are completed sub tasks, the green nodes are pending ones, while the red ones are in memory and being currently processed.
Finally, you can go to the workers tab to see all your processor cores, and memory and the load they are taking.
You can also go to the status tab to see graphs of how disk, processor, network etc. are running.
While running this notebook it will be helpful if you keep the client open on one tab and monitor how the tasks parallelise and make your computer(s?) turn up the heat.
review = dd.read_json("/kaggle/input/yelp-dataset/yelp_academic_dataset_review.json",lines = True, encoding = 'utf-8', blocksize="100MB")
business = dd.read_json("/kaggle/input/yelp-dataset/yelp_academic_dataset_business.json",lines = True, encoding = 'utf-8', blocksize="100MB")
user = dd.read_json("/kaggle/input/yelp-dataset/yelp_academic_dataset_user.json",lines = True, encoding = 'utf-8', blocksize="100MB")
checkin = dd.read_json("/kaggle/input/yelp-dataset/yelp_academic_dataset_checkin.json",lines = True, encoding = 'utf-8', blocksize="100MB")
tip = dd.read_json("/kaggle/input/yelp-dataset/yelp_academic_dataset_tip.json",lines = True, encoding = 'utf-8', blocksize="100MB")
review.head()
business.head()
user.head()
checkin.head()
tip.head()
Now that we know what sort of data we have, let us pick 3 things that we can possibly learn from this data
star_list = business.groupby('state').stars.mean().compute()
plt.figure(figsize=(10,10))
sns.barplot(star_list.index,star_list.values)
star_list.sort_values(ascending=False)
Clearly Tenessee and New Jersey have the highest star ratings But what about the number of review, if they have less reviews then that could be a bit biased. Let us check that
rev_count_list = business.groupby('state').review_count.sum().compute()
rev_count_list.sort_values()
Aha! So that intuition was right. Both those states have only 3 reviews which makes it a very skewed sample. Now we can take out the states that don't have atleast a 3 digit count of reviews and revisit the stars of ratings.
sufficient_rev_states = rev_count_list.loc[rev_count_list > 100].index.tolist()
star_list = business.groupby('state').stars.mean().compute()
star_rev = pd.concat([star_list[sufficient_rev_states], rev_count_list.loc[rev_count_list > 100]], axis=1)
star_rev.sort_values(by='stars',ascending=False)
This still does not seem to represent the whole picture. If we create a new feature, that is an average of stars per review, it could be a more accurate representation of how good the restaurants are in that state
star_count = business.groupby('state').stars.sum().compute()[sufficient_rev_states]
star_rev['star_count'] = star_count
star_rev['stars_per_review'] = star_rev.apply(lambda row: row.star_count / row.review_count, axis = 1)
star_rev.rename(columns={'stars':'star_mean'},inplace=True)
star_rev.sort_values(by='stars_per_review',ascending=False)
California is the state with the highest stars per review. It can be thought of as the state with the best restaurants.
Well unless more reviews come in with low ratings of course.
To find the difference, let us try to find out the most commonly appearing words in a good review (5 stars), and a useful review (top 75%tile of usefulness).
review.describe().compute()
useful = review.loc[review['useful'] > 2]
useful.head()
u_text = useful.text.values.compute()
Here we use a dask array instead of storing it in a normal array. The chunks of size 1000 will help us perform parallel functions on those chunks and aggregate them before outputting the results.
# Printing some sample reviews
print(u_text[0])
print(u_text[1])
print(u_text[2])
dist_u_text = da.from_array(u_text, chunks=1000)
CountVectorizer is an inbuilt Scikit-learn function that will help us create a term-frequency matrix. We also use NLTK (Natural Language ToolKit) to help remove stopwords, i.e. words such as "the", "and", "a". These stopwords are almost definitely the most commonly occuring words and will introduce noise in our outputs so we remove them.
from sklearn.feature_extraction.text import CountVectorizer
import nltk
from nltk.corpus import stopwords
stop = set(stopwords.words('english'))
We modify the function used by Cristhian Boujon in one of his blogs to accomodate the removal of stopwords.
# Function from : https://gist.github.com/CristhianBoujon/c719ba2287a630a6d3821d37a9608ac8#file-get_top_n_words-py
def get_top_n_words(corpus, n=None):
"""
List the top n words in a vocabulary according to occurrence in a text corpus.
get_top_n_words(["I love Python", "Python is a language programming", "Hello world", "I love the world"]) ->
[('python', 2),
('world', 2),
('love', 2),
('hello', 1),
('is', 1),
('programming', 1),
('the', 1),
('language', 1)]
"""
vec = CountVectorizer().fit(corpus)
bag_of_words = vec.transform(corpus)
sum_words = bag_of_words.sum(axis=0)
words_freq = []
for word, idx in vec.vocabulary_.items():
if word not in stop:
words_freq.append((word, sum_words[0, idx]))
words_freq =sorted(words_freq, key = lambda x: x[1], reverse=True)
return words_freq[:n]
We use the delayed wrapper to perform the function in parallel.
top_u_words = delayed(get_top_n_words)(dist_u_text,15)
out_u = top_u_words.compute()
out_u
good = review.loc[review['stars'] == 5]
good.head()
g_text = good.text.values.compute()
dist_g_text = da.from_array(g_text, chunks=1000)
top_g_words = delayed(get_top_n_words)(dist_g_text,15)
out_g = top_g_words.compute()
out_g
While there is some definite overlap like "place" and "good", strong positive words such as "love", "best" and "amazing" exist more in the good reviews, while less positive words such as "like" and "really" are present in useful reviews. There is also a distinct set of words "would go back" which might be a commonly occurring phrase in useful reviews.
user.head()
elites = user[user['elite'] != '']
elites.describe().compute()
famous = elites[elites['fans'] > 75]
famous_sm = famous[['name','review_count','useful','funny','cool']]
famous_sm.head()
sns.violinplot(x=famous_sm['review_count'])
# Removing outliers for better plot
famous_to_plot = famous_sm[(famous_sm['useful'] < 10000) & (famous_sm['funny'] < 10000) & (famous_sm['cool'] < 10000)]
plt.subplot(1,3,1)
sns.violinplot(x=famous_to_plot['useful'],cut=0)
plt.subplot(1,3,2)
sns.violinplot(x=famous_to_plot['funny'],cut=0)
plt.subplot(1,3,3)
sns.violinplot(x=famous_to_plot['cool'],cut=0)
From the above, we can assign equal weightage to funny and cool, but a little more weightage to useful.
famous_sm['score'] = famous_sm.apply(lambda row: (0.4*row.useful + 0.3*row.funny + 0.3*row.cool)/(3*row.review_count) , meta=(None, 'float64'),axis=1)
famous_sm[famous_sm['score'] > 15].head()
Clearly, Rodney seems to be the most influential based not only on the defined score, but taking a look at the number of his reviews, and their attributes you can definitely see that he is a loud voice in the Yelp community
So, hopefully this short exercise was a good introduction into using dask dataframes and arrays in parallelising the work on large datasets. Going forward, the official dask tutorials would be great to go through if you want to continue working with this library.
Thank you for reading and good luck!
!jupyter nbconvert Yelpd.ipynb