Tutorial-SparkSQL.ipynb
Data exploration and data analysis using Spark SQL¶
This demostrates how to use Spark SQL - filter, aggregates and joins, with some additional notes on advanced SQL and Python UDF
Run this notebook from Jupyter with Python kernel
- When using on CERN SWAN, do not attach the notebook to a Spark cluster, but rather run locally on the SWAN container
- If running this outside CERN SWAN, please make sure to have PySpark installed:
pip install pyspark
First let's create a Spark Session¶
In [ ]:
#
# Local mode: run this when using CERN SWAN not connected to a cluster
# or run it on a private Jupyter notebook instance
# Dependency: PySpark (use SWAN or pip install pyspark)
#
# !pip install pyspark
# Create Spark Session, you need this to work with Spark
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("my app")
.master("local[1]")
.config("spark.driver.memory","1g")
.config("spark.ui.showConsoleProgress", "false")
.getOrCreate()
)
In [2]:
spark
Out[2]:
Some basic Spark SQL to get started¶
In [4]:
# What you can do with DataFrame declarative API you can also do with Spark SQL.
# This uses the DataFrame API:
df = spark.range(10)
df.show()
df.count()
Out[4]:
In [5]:
# This uses Spark SQL
df2 = spark.sql("select id from range(10)")
df2.count()
Out[5]:
In [6]:
# From DataFrame to Spark SQL View
df = spark.createDataFrame([(1, "event1"), (2,"event2"), (3, "event3")], ("id","name"))
df.createOrReplaceTempView("myEvents")
spark.sql("select count(*) from myEvents").show()
In [7]:
# Execution plans
spark.sql("select * from myEvents where id=1").explain(True)
In [8]:
# Another syntax to create test tables
df = spark.sql("select * from values (1, 'event1'), (2,'event2'), (3, 'event3') as (id,name)")
# this will cache the table, note that caching works lazily
# the table will be cached only after the first access, that's why we add df.count()
df.cache()
df.count()
# this registers a temporary view called t1
df.createOrReplaceTempView("t1")
# list the tables and views in the catalog
spark.catalog.listTables()
Out[8]:
In [9]:
# Show the table schema, with 2 different methods
df.printSchema()
spark.sql("describe t1").show()
In [10]:
# Basic select, list the columns you want to retrieve, can also add calculated columns
spark.sql("select id, name, id % 2 even_or_odd from t1").show()
In [11]:
# Selection with a filter clause
spark.sql("select id, name, id % 2 even_or_odd from t1 where id < 3").show()
In [12]:
# Aggregations with group by
spark.sql("""select id % 2 as Even_or_odd, count(*) N_samples, sum(id) Sum_Ids
from t1
group by id % 2""").show()
SQL for data analysis¶
In [13]:
# Join example using a parent-child relationship
# Create test tables
emp = spark.createDataFrame([(1, "Emp1", 10), (2,"Emp2", 10), (3, "Emp3", 20)], ("id","name","dep_id"))
emp.createOrReplaceTempView("employees")
dep = spark.createDataFrame([(10, "Department1"), (20, "Department2"), (30, "Department3")], ("id","name"))
dep.createOrReplaceTempView("departments")
In [14]:
# Inner join
spark.sql("""
select employees.id, employees.name emp_name, departments.name dep_name
from employees join departments
on employees.dep_id = departments.id
order by employees.id""").toPandas()
Out[14]:
In [15]:
# Outer join
spark.sql("""
select departments.id, departments.name dep_name, employees.name emp_name
from departments left outer join employees
on employees.dep_id = departments.id
order by departments.id""").toPandas()
Out[15]:
In [16]:
# Window function
spark.sql("""
select id, name, dep_id,
max(id) over (partition by dep_id) as greatest_id_same_department,
lag(name) over (order by id) as previous_employee_name
from employees
order by id
""").toPandas()
Out[16]:
In [17]:
# A helper "magic function" for running PySpark SQL in Jupyter Notebooks
from IPython.core.magic import register_line_cell_magic
max_show_lines = 50 # Limit on the number of lines to show with %sql_show and %sql_display
@register_line_cell_magic
def sql_display(line, cell=None):
"""Execute sql and convert results to Pandas DataFrame for pretty display or further processing.
Use: %sql_display or %%sql_display"""
val = cell if cell is not None else line
return spark.sql(val).limit(max_show_lines).toPandas()
In [18]:
%%sql_display
select id % 2 as Even_or_odd, count(*) N_samples, sum(id) Sum_Ids
from t1
group by id % 2
Out[18]:
In [19]:
%%sql_display
-- Fun with SQL: FizzBuzz, see https://en.wikipedia.org/wiki/Fizz_buzz
select id, case
when id % 15 = 0 then 'FizzBuzz'
when id % 3 = 0 then 'Fizz'
when id % 5 = 0 then 'Buzz'
else cast(id as string)
end as FizzBuzz
from range(1,20)
order by id
Out[19]:
In [20]:
# this is the standard way to run Spark sql with PySpark
# Fun with SQL: FizzBuzz, see https://en.wikipedia.org/wiki/Fizz_buzz
spark.sql("""
select id, case
when id % 15 = 0 then 'FizzBuzz'
when id % 3 = 0 then 'Fizz'
when id % 5 = 0 then 'Buzz'
else cast(id as string)
end as FizzBuzz
from range(1,20)
order by id""").show()
Processing nested data with arrays, structs and maps¶
In [21]:
# Prepare test data with arrays
# readings from a sensor measuring temperature in Celsius
schema = "id INT, temp_celsius ARRAY<INT>"
t_list = [1,[35, 36, 32, 30, 40, 42, 38]], [2,[31, 32, 34, 55, 56]]
spark.createDataFrame(t_list, schema).createOrReplaceTempView("temp_data")
spark.sql("select * from temp_data").show(10,False)
In [22]:
# Example of array functions
# Take first temperature reading and the max temperature reading
spark.sql("select temp_celsius[0] first_temp_reading, array_max(temp_celsius) max_reading from temp_data").toPandas()
Out[22]:
In [23]:
# Array procesing with Spark using "higher order functions" in SQL
# Compute conversion from Fahrenheit from Celsius for an array of temperatures
spark.sql("""
SELECT id, temp_celsius,
transform(temp_celsius, t -> ((t * 9) / 5) + 32) as temp_fahrenheit
FROM temp_data
""").toPandas()
Out[23]:
In [24]:
# Array procesing using Spark higher order functions in SQL
# Filter temperatures > 38C from an array of temperature values
spark.sql("""
SELECT id, temp_celsius, filter(temp_celsius, t -> t > 38) as high
FROM temp_data
""").show(10,False)
In [25]:
# This demonstrates using the "legacy" SQL functions explode and collect_list
# the performance is suboptimal, especially for large arrays
# also quite hard to read
spark.sql("""
with exploded_data as (
select id, explode(temp_celsius) val from temp_data
)
select id, collect_list(val) from exploded_data where val > 38 group by id
""").show(10,False)
In [26]:
# Example of array functions, aggregate (higher orger function) and cardinality
# - aggregate(expr, start, merge, finish) - Applies a binary operator to an initial state and all elements in the array,
# and reduces this to a single state. The final state is converted into the final result by applying a finish function.
# - cardinality(expr) - Returns the size of an array or a map.
spark.sql("""
SELECT id, aggregate(temp_celsius, 0, (acc, x) -> acc + x,
acc -> round(acc / cardinality(temp_celsius),1)) as average_temperature
from temp_data""").show()
Structs and arrays¶
In [27]:
# Example with structs and arrays
# Inspired from physics datasets
schema = "event LONG, HLT struct<flag1:boolean, flag2:boolean>, muons ARRAY<STRUCT<pt:FLOAT, eta:FLOAT, mass:FLOAT>>"
t_list = [[1000, [True,True] , [[1.1,2.2,1.0], [1.2,2.2,1.0]]], [1001, [True,True], [[1.2,2.3, 1.0],[1.3,2.3, 1.0]]]]
df = spark.createDataFrame(t_list, schema)
df.printSchema()
df.createOrReplaceTempView("particles")
spark.sql("select * from particles").show(10,False)
In [28]:
# display only flag1 and the first muon in the list
spark.sql("select event, HLT.flag1, muons[0] from particles").show(10,False)
Use maps for storing key-value pairs¶
In [29]:
schema = "id INT, myKeyValues MAP<INT, INT>"
t_list = [[1000, {1:1, 2:2}], [1001, {1:10, 2:11, 3:12}]]
df = spark.createDataFrame(t_list, schema)
df.createOrReplaceTempView("t1_map")
df.printSchema()
df.show(10,False)
In [30]:
spark.sql("SELECT id, transform_values(myKeyValues, (k, v) -> k + v) as mymap_transformed from t1_map").show(10, False)
Python User Defined Functions (UDF)¶
In [31]:
# Basic Python UDF example
import time
def slowf(s):
time.sleep(1) # this waits for 1 second
return 2*s
spark.udf.register("slowf", slowf)
# warmup
spark.sql("select slowf(1)").show()
%time spark.sql("select slowf(1)").show()
%time spark.sql("select id, slowf(id) from range(10)").show()
Pandas UDF - improves on Python UDF¶
In [32]:
# Python Pandas UDF example
# Pandas UDF are a faster implementation for Python UDF
import time
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('long')
def multiply_func(a,b):
time.sleep(1)
return a * b
spark.udf.register("multiply_func", multiply_func)
Out[32]:
In [33]:
# Can you guess how long will it take to run the udf with 1 row, 10 rows and 10000 rows?
# Run the followin cell to see the answer
# warmup
spark.sql("select multiply_func(1,1)").show()
%time spark.sql("select multiply_func(1,1)").show()
%time spark.sql("select id, multiply_func(id,2) from range(10)").show()
num_rows=10000
print(f"\nNow running on {num_rows} rows")
%time a=spark.sql(f"select multiply_func(id,2) from range({num_rows})").collect()
In [ ]:
# What will change if I run on 10001 rows?
num_rows=10001
print(f"\nNow running on {num_rows} rows")
%time a=spark.sql(f"select multiply_func(id,2) from range({num_rows})").collect()
In [ ]:
# Pandas UDF with type hints for Python 3
# This is the recommended syntax introduced Spark 3
import time
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('long')
def multiply_func2(a: pd.Series, b: pd.Series) -> pd.Series:
time.sleep(1)
return a * b
spark.udf.register("multiply_func2", multiply_func)
# warmup
spark.sql("select multiply_func2(1,1)").show()
In [ ]:
# mapInPandas
# This allows to return a different number of rows than what was in the input
# while still using arrow serialization for performance
# Note: this uses the DataFrame API
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
df.show()
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
In [ ]:
# End the Spark application
spark.stop()
In [ ]: