HandsOn-SparkSQL_with_solutions.ipynb
Run this notebook from Jupyter with Python kernel
- When using on CERN SWAN, do not attach the notebook to a Spark cluster, but rather run locally on the SWAN container
- If running this outside CERN SWAN, plese make sure to tha PySpark installed:
pip install pyspark
Examples datasets¶
The following examples use sample data provided in the repository.
We will use the movielens dataset from Kaggle, credits: https://www.kaggle.com/grouplens/movielens-20m-dataset
In [ ]:
# Create Spark Session, you need this to work with Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("My spark example app") \
.master("local[*]") \
.config("spark.driver.memory","8g") \
.config("spark.ui.showConsoleProgress", "false") \
.getOrCreate()
In [2]:
spark
Out[2]:
In [3]:
# sets the path to the directory with datafiles
PATH = "../data/"
In [4]:
ratings = spark.read.option("header","true").csv(PATH + "ratings1.csv.gz")
In [5]:
ratings.show(5)
In [6]:
ratings.printSchema()
In [7]:
# infer schema needs to go through the data to estimate the schema, this takes time
%time ratings = spark.read.option("header","true").option("inferSchema", "true").csv(PATH + "ratings1.csv.gz")
# note ratings*.csv.gz will read rating1.csv.gz and ratings2.csv.gz, more data, however slower to run
# spark.read.option("header","true").option("inferSchema", "true").csv(PATH + "ratings*.csv.gz")
In [8]:
ratings.printSchema()
In [9]:
# movielens dataset
movies = spark.read.option("header","true").option("inferSchema", "true").csv(PATH + "movies.csv.gz")
In [10]:
movies.show(5, False)
In [11]:
movies.printSchema()
In [12]:
tags = spark.read.option("header","true").option("inferSchema", "true").csv(PATH + "tags.csv.gz")
In [13]:
tags.show(5)
Register the dataframes as Spark Temporary Views¶
In [14]:
ratings.createOrReplaceTempView("ratings")
movies.createOrReplaceTempView("movies")
tags.createOrReplaceTempView("tags")
In [15]:
# note what happens when we query a table in a csv file with a filter
spark.sql("select movieId, title from movies where movieId=1").explain()
In [16]:
# cache the tables, to improve the performance of the rest of the queries in the notebook
# note: default caching level is MEMORY_AND_DISK (i.e. caching in memory if enough heap is available)
# note: caching is lazily executed, so a count() action is added to make the operation happen
# this operation may take a couple of minutes
r = ratings.cache().count()
m = movies.cache().count()
t = tags.cache().count()
In [17]:
print(f"Num ratings = {r}\nNum tags = {t}\nNum movies = {m}")
In [18]:
# Add the column Year to "movies"
movies_year = spark.sql("select *, regexp_extract(title,'^(.*) \\\\(([0-9 \\\\-]*)\\\\)$',2) as Year from movies")
movies_year.show(5,False)
movies_year.createOrReplaceTempView("movies_year")
# This is the DataFrame API equivalent, not that \\ oddly need to be changed to \\\\\ when using SQL, at least in this version of Spark
# from pyspark.sql.functions import regexp_extract
# movies_year = movies.withColumn("Year",regexp_extract("title",'^(.*) \\(([0-9 \\-]*)\\)$',2))
In [19]:
# number of movies per year
m_yr = spark.sql("select year, count(1) as count from movies_year group by year order by year").toPandas()
In [20]:
%matplotlib notebook
import matplotlib.pyplot as plt
import pandas as pd
plt.style.use('seaborn-darkgrid')
In [21]:
m_yr.plot(x='year',y='count',kind='line', title='Movies per year');
2) Top movies by number of ratings¶
In [22]:
# A query to perform a join operation between movies and ratings
# Find the highest rated movies
spark.sql("""
select title, count(*)
from movies m, ratings r
where m.movieId = r.movieId
group by title
order by 2 desc""").limit(5).toPandas()
Out[22]:
In [23]:
spark.sql("""
select title, count(*)
from movies m, ratings r
where m.movieId = r.movieId
group by title
order by 2 desc""").explain(True)
3) Highly rated movies¶
Find the top 5 highly rated movies
In [24]:
spark.sql("""select title, avg(rating) as avg_rating from movies m, ratings r
where m.movieId = r.movieId
group by title
order by 2 desc""").show(5, False)
Drill down on the top entries:¶
- How many reviews contributed to this rating?
In [25]:
spark.sql("""select title, avg(rating) as avg_rating, count(*) as count from movies m, ratings r
where m.movieId = r.movieId
group by title
order by 2 desc""").show(5, False)
Lets only take in account movies that have more than 100 reviews
In [26]:
spark.sql("""select title, avg(rating) as avg_rating, count(*) as count from movies m, ratings r
where m.movieId = r.movieId
group by title
having count(*) > 100
order by 2 desc""").limit(20).toPandas()
Out[26]:
4) Find the top rated movie of every year since 2000¶
In [27]:
avg_ratings = spark.sql("""select year, title, round(avg(rating),2) as avg_rating, count(*) as count
from movies_year m, ratings r where m.movieId = r.movieId
group by year, title
having count(*) > 100""")
avg_ratings.createOrReplaceTempView("avg_ratings")
# note this is just the definition of a helper view
# because of lazy execution no query is run at this step
In [28]:
# the query for top-rated movies is run here, triggered by the action to show the first 20 rows
spark.sql("""select a.year, a.title, avg_rating from avg_ratings a,
(select year, max(avg_rating) as max_rating from avg_ratings group by year) m
where a.year = m.year
and a.avg_rating = m.max_rating
and a.year > 2000
order by year""").show(20, False)
In [29]:
# End the Spark application
spark.stop()
In [ ]: