Tutorial-SparkSQL.ipynb Open in SWAN Download

Tutorial-SparkSQL.ipynb

This notebook is part of the Apache Spark training delivered by CERN-IT

Spark SQL Tutorial and Hands-On Lab

Contact: Luca.Canali@cern.ch

Data exploration and data analysis using Spark SQL

This demostrates how to use Spark SQL - filter, aggregates and joins, with some additional notes on advanced SQL and Python UDF

Run this notebook from Jupyter with Python kernel

  • When using on CERN SWAN, do not attach the notebook to a Spark cluster, but rather run locally on the SWAN container
  • If running this outside CERN SWAN, please make sure to have PySpark installed: pip install pyspark

First let's create a Spark Session

In [ ]:
#
# Local mode: run this when using CERN SWAN not connected to a cluster 
#             or run it on a private Jupyter notebook instance
#             Dependency: PySpark (use SWAN or pip install pyspark)
#

# !pip install pyspark

# Create Spark Session, you need this to work with Spark
from pyspark.sql import SparkSession
spark = (SparkSession.builder 
         .appName("my app")
         .master("local[1]")
         .config("spark.driver.memory","1g")
         .config("spark.ui.showConsoleProgress", "false")
         .getOrCreate()
        )
In [2]:
spark
Out[2]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.3.1
Master
local[1]
AppName
my app

Some basic Spark SQL to get started

In [4]:
# What you can do with DataFrame declarative API you can also do with Spark SQL.  
# This uses the DataFrame API:

df = spark.range(10)
df.show()
df.count()
+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+

Out[4]:
10
In [5]:
# This uses Spark SQL
df2 = spark.sql("select id from range(10)")
df2.count()
Out[5]:
10
In [6]:
# From DataFrame to Spark SQL View

df = spark.createDataFrame([(1, "event1"), (2,"event2"), (3, "event3")], ("id","name"))
df.createOrReplaceTempView("myEvents")

spark.sql("select count(*) from myEvents").show()
+--------+
|count(1)|
+--------+
|       3|
+--------+

In [7]:
# Execution plans
spark.sql("select * from myEvents where id=1").explain(True)
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('id = 1)
   +- 'UnresolvedRelation [myEvents], [], false

== Analyzed Logical Plan ==
id: bigint, name: string
Project [id#21L, name#22]
+- Filter (id#21L = cast(1 as bigint))
   +- SubqueryAlias myevents
      +- View (`myEvents`, [id#21L,name#22])
         +- LogicalRDD [id#21L, name#22], false

== Optimized Logical Plan ==
Filter (isnotnull(id#21L) AND (id#21L = 1))
+- LogicalRDD [id#21L, name#22], false

== Physical Plan ==
*(1) Filter (isnotnull(id#21L) AND (id#21L = 1))
+- *(1) Scan ExistingRDD[id#21L,name#22]

In [8]:
# Another syntax to create test tables
df = spark.sql("select * from values (1, 'event1'), (2,'event2'), (3, 'event3') as (id,name)")

# this will cache the table, note that caching works lazily
# the table will be cached only after the first access, that's why we add df.count()
df.cache()
df.count()

# this registers a temporary view called t1
df.createOrReplaceTempView("t1")

# list the tables and views in the catalog
spark.catalog.listTables()
Out[8]:
[Table(name='myevents', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='t1', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
In [9]:
# Show the table schema, with 2 different methods

df.printSchema()
spark.sql("describe t1").show()
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = false)

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|      id|      int|   null|
|    name|   string|   null|
+--------+---------+-------+

In [10]:
# Basic select, list the columns you want to retrieve, can also add calculated columns

spark.sql("select id, name, id % 2 even_or_odd from t1").show()
+---+------+-----------+
| id|  name|even_or_odd|
+---+------+-----------+
|  1|event1|          1|
|  2|event2|          0|
|  3|event3|          1|
+---+------+-----------+

In [11]:
# Selection with a filter clause

spark.sql("select id, name, id % 2 even_or_odd from t1 where id < 3").show()
+---+------+-----------+
| id|  name|even_or_odd|
+---+------+-----------+
|  1|event1|          1|
|  2|event2|          0|
+---+------+-----------+

In [12]:
# Aggregations with group by 

spark.sql("""select id % 2 as Even_or_odd, count(*) N_samples, sum(id) Sum_Ids 
       from t1 
       group by id % 2""").show()
+-----------+---------+-------+
|Even_or_odd|N_samples|Sum_Ids|
+-----------+---------+-------+
|          1|        2|      4|
|          0|        1|      2|
+-----------+---------+-------+

SQL for data analysis

In [13]:
# Join example using a parent-child relationship

# Create test tables
emp = spark.createDataFrame([(1, "Emp1", 10), (2,"Emp2", 10), (3, "Emp3", 20)], ("id","name","dep_id"))
emp.createOrReplaceTempView("employees")

dep = spark.createDataFrame([(10, "Department1"), (20, "Department2"), (30, "Department3")], ("id","name"))
dep.createOrReplaceTempView("departments")
In [14]:
# Inner join
spark.sql("""
select employees.id, employees.name emp_name, departments.name dep_name
from employees join departments
on employees.dep_id = departments.id
order by employees.id""").toPandas()
Out[14]:
id emp_name dep_name
0 1 Emp1 Department1
1 2 Emp2 Department1
2 3 Emp3 Department2
In [15]:
# Outer join
spark.sql("""
select departments.id, departments.name dep_name, employees.name emp_name
from departments left outer join employees
on employees.dep_id = departments.id
order by departments.id""").toPandas()
Out[15]:
id dep_name emp_name
0 10 Department1 Emp1
1 10 Department1 Emp2
2 20 Department2 Emp3
3 30 Department3 None
In [16]:
# Window function
spark.sql("""
select id, name, dep_id,
       max(id) over (partition by dep_id) as greatest_id_same_department,
       lag(name) over (order by id) as previous_employee_name
from employees
order by id
""").toPandas()
2022-09-26 16:57:29,433 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-09-26 16:57:29,592 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
Out[16]:
id name dep_id greatest_id_same_department previous_employee_name
0 1 Emp1 10 2 None
1 2 Emp2 10 2 Emp1
2 3 Emp3 20 3 Emp2
In [17]:
# A helper "magic function" for running PySpark SQL in Jupyter Notebooks

from IPython.core.magic import register_line_cell_magic
max_show_lines = 50         # Limit on the number of lines to show with %sql_show and %sql_display
@register_line_cell_magic
def sql_display(line, cell=None):
    """Execute sql and convert results to Pandas DataFrame for pretty display or further processing.
    Use: %sql_display or %%sql_display"""
    val = cell if cell is not None else line 
    return spark.sql(val).limit(max_show_lines).toPandas()
In [18]:
%%sql_display

select id % 2 as Even_or_odd, count(*) N_samples, sum(id) Sum_Ids 
from t1 
group by id % 2
Out[18]:
Even_or_odd N_samples Sum_Ids
0 1 2 4
1 0 1 2
In [19]:
%%sql_display

-- Fun with SQL: FizzBuzz, see https://en.wikipedia.org/wiki/Fizz_buzz

select id, case
    when id % 15 = 0 then 'FizzBuzz'
    when id % 3 = 0 then 'Fizz'
    when id % 5 = 0 then 'Buzz'
    else cast(id as string)
    end as FizzBuzz
from range(1,20)
order by id
Out[19]:
id FizzBuzz
0 1 1
1 2 2
2 3 Fizz
3 4 4
4 5 Buzz
5 6 Fizz
6 7 7
7 8 8
8 9 Fizz
9 10 Buzz
10 11 11
11 12 Fizz
12 13 13
13 14 14
14 15 FizzBuzz
15 16 16
16 17 17
17 18 Fizz
18 19 19
In [20]:
# this is the standard way to run Spark sql with PySpark
# Fun with SQL: FizzBuzz, see https://en.wikipedia.org/wiki/Fizz_buzz

spark.sql("""
select id, case
    when id % 15 = 0 then 'FizzBuzz'
    when id % 3 = 0 then 'Fizz'
    when id % 5 = 0 then 'Buzz'
    else cast(id as string)
    end as FizzBuzz
from range(1,20)
order by id""").show()
+---+--------+
| id|FizzBuzz|
+---+--------+
|  1|       1|
|  2|       2|
|  3|    Fizz|
|  4|       4|
|  5|    Buzz|
|  6|    Fizz|
|  7|       7|
|  8|       8|
|  9|    Fizz|
| 10|    Buzz|
| 11|      11|
| 12|    Fizz|
| 13|      13|
| 14|      14|
| 15|FizzBuzz|
| 16|      16|
| 17|      17|
| 18|    Fizz|
| 19|      19|
+---+--------+

Processing nested data with arrays, structs and maps

In [21]:
# Prepare test data with arrays
# readings from a sensor measuring temperature in Celsius

schema = "id INT, temp_celsius ARRAY<INT>"

t_list = [1,[35, 36, 32, 30, 40, 42, 38]], [2,[31, 32, 34, 55, 56]]

spark.createDataFrame(t_list, schema).createOrReplaceTempView("temp_data")

spark.sql("select * from temp_data").show(10,False)
+---+----------------------------+
|id |temp_celsius                |
+---+----------------------------+
|1  |[35, 36, 32, 30, 40, 42, 38]|
|2  |[31, 32, 34, 55, 56]        |
+---+----------------------------+

In [22]:
# Example of array functions
# Take first temperature reading and the max temperature reading

spark.sql("select temp_celsius[0] first_temp_reading, array_max(temp_celsius) max_reading from temp_data").toPandas()
Out[22]:
first_temp_reading max_reading
0 35 42
1 31 56
In [23]:
# Array procesing with Spark using "higher order functions" in SQL
# Compute conversion from Fahrenheit from Celsius for an array of temperatures

spark.sql("""
SELECT id, temp_celsius, 
 transform(temp_celsius, t -> ((t * 9) / 5) + 32) as temp_fahrenheit 
  FROM temp_data
""").toPandas()
Out[23]:
id temp_celsius temp_fahrenheit
0 1 [35, 36, 32, 30, 40, 42, 38] [95.0, 96.8, 89.6, 86.0, 104.0, 107.6, 100.4]
1 2 [31, 32, 34, 55, 56] [87.8, 89.6, 93.2, 131.0, 132.8]
In [24]:
# Array procesing using Spark higher order functions in SQL
# Filter temperatures > 38C from an array of temperature values

spark.sql("""
SELECT id, temp_celsius, filter(temp_celsius, t -> t > 38) as high 
FROM temp_data
""").show(10,False)
+---+----------------------------+--------+
|id |temp_celsius                |high    |
+---+----------------------------+--------+
|1  |[35, 36, 32, 30, 40, 42, 38]|[40, 42]|
|2  |[31, 32, 34, 55, 56]        |[55, 56]|
+---+----------------------------+--------+

In [25]:
# This demonstrates using the "legacy" SQL functions explode and collect_list 
# the performance is suboptimal, especially for large arrays
# also quite hard to read

spark.sql("""
with exploded_data as (
  select id, explode(temp_celsius) val from temp_data
)
select id, collect_list(val) from exploded_data where val > 38 group by id
""").show(10,False)
+---+-----------------+
|id |collect_list(val)|
+---+-----------------+
|1  |[40, 42]         |
|2  |[55, 56]         |
+---+-----------------+

In [26]:
# Example of array functions, aggregate (higher orger function) and cardinality

# - aggregate(expr, start, merge, finish) - Applies a binary operator to an initial state and all elements in the array, 
#   and reduces this to a single state. The final state is converted into the final result by applying a finish function.
# - cardinality(expr) - Returns the size of an array or a map. 

spark.sql("""
          SELECT id, aggregate(temp_celsius, 0, (acc, x) -> acc + x,
          acc -> round(acc / cardinality(temp_celsius),1)) as average_temperature
          from temp_data""").show()
+---+-------------------+
| id|average_temperature|
+---+-------------------+
|  1|               36.1|
|  2|               41.6|
+---+-------------------+

Structs and arrays

In [27]:
# Example with structs and arrays
# Inspired from physics datasets

schema = "event LONG, HLT struct<flag1:boolean, flag2:boolean>, muons ARRAY<STRUCT<pt:FLOAT, eta:FLOAT, mass:FLOAT>>"

t_list = [[1000, [True,True] , [[1.1,2.2,1.0], [1.2,2.2,1.0]]], [1001, [True,True], [[1.2,2.3, 1.0],[1.3,2.3, 1.0]]]]

df = spark.createDataFrame(t_list, schema)
df.printSchema()

df.createOrReplaceTempView("particles")

spark.sql("select * from particles").show(10,False)
root
 |-- event: long (nullable = true)
 |-- HLT: struct (nullable = true)
 |    |-- flag1: boolean (nullable = true)
 |    |-- flag2: boolean (nullable = true)
 |-- muons: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- pt: float (nullable = true)
 |    |    |-- eta: float (nullable = true)
 |    |    |-- mass: float (nullable = true)

+-----+------------+----------------------------------+
|event|HLT         |muons                             |
+-----+------------+----------------------------------+
|1000 |{true, true}|[{1.1, 2.2, 1.0}, {1.2, 2.2, 1.0}]|
|1001 |{true, true}|[{1.2, 2.3, 1.0}, {1.3, 2.3, 1.0}]|
+-----+------------+----------------------------------+

In [28]:
# display only flag1 and the first muon in the list

spark.sql("select event, HLT.flag1, muons[0] from particles").show(10,False)
+-----+-----+---------------+
|event|flag1|muons[0]       |
+-----+-----+---------------+
|1000 |true |{1.1, 2.2, 1.0}|
|1001 |true |{1.2, 2.3, 1.0}|
+-----+-----+---------------+

Use maps for storing key-value pairs

In [29]:
schema = "id INT, myKeyValues MAP<INT, INT>"

t_list = [[1000, {1:1, 2:2}], [1001, {1:10, 2:11, 3:12}]]
df = spark.createDataFrame(t_list, schema)
df.createOrReplaceTempView("t1_map")

df.printSchema()
df.show(10,False)
root
 |-- id: integer (nullable = true)
 |-- myKeyValues: map (nullable = true)
 |    |-- key: integer
 |    |-- value: integer (valueContainsNull = true)

+----+---------------------------+
|id  |myKeyValues                |
+----+---------------------------+
|1000|{1 -> 1, 2 -> 2}           |
|1001|{1 -> 10, 2 -> 11, 3 -> 12}|
+----+---------------------------+

In [30]:
spark.sql("SELECT id, transform_values(myKeyValues, (k, v) -> k + v) as mymap_transformed from t1_map").show(10, False)
+----+---------------------------+
|id  |mymap_transformed          |
+----+---------------------------+
|1000|{1 -> 2, 2 -> 4}           |
|1001|{1 -> 11, 2 -> 13, 3 -> 15}|
+----+---------------------------+

Python User Defined Functions (UDF)

In [31]:
# Basic Python UDF example

import time
def slowf(s):
  time.sleep(1) # this waits for 1 second
  return 2*s

spark.udf.register("slowf", slowf)

# warmup
spark.sql("select slowf(1)").show()

%time spark.sql("select slowf(1)").show()

%time spark.sql("select id, slowf(id) from range(10)").show()
+--------+
|slowf(1)|
+--------+
|       2|
+--------+

+--------+
|slowf(1)|
+--------+
|       2|
+--------+

CPU times: user 2.72 ms, sys: 1.08 ms, total: 3.8 ms
Wall time: 1.12 s
+---+---------+
| id|slowf(id)|
+---+---------+
|  0|        0|
|  1|        2|
|  2|        4|
|  3|        6|
|  4|        8|
|  5|       10|
|  6|       12|
|  7|       14|
|  8|       16|
|  9|       18|
+---+---------+

CPU times: user 3.33 ms, sys: 1.56 ms, total: 4.89 ms
Wall time: 10.2 s

Pandas UDF - improves on Python UDF

In [32]:
# Python Pandas UDF example
# Pandas UDF are a faster implementation for Python UDF

import time
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def multiply_func(a,b):
  time.sleep(1)
  return a * b

spark.udf.register("multiply_func", multiply_func)
Out[32]:
<function __main__.multiply_func(a, b)>
In [33]:
# Can you guess how long will it take to run the udf with 1 row, 10 rows and 10000 rows?
# Run the followin cell to see the answer

# warmup
spark.sql("select multiply_func(1,1)").show()

%time spark.sql("select multiply_func(1,1)").show()

%time spark.sql("select id, multiply_func(id,2) from range(10)").show()

num_rows=10000
print(f"\nNow running on {num_rows} rows")
%time a=spark.sql(f"select multiply_func(id,2) from range({num_rows})").collect()
+-------------------+
|multiply_func(1, 1)|
+-------------------+
|                  1|
+-------------------+

+-------------------+
|multiply_func(1, 1)|
+-------------------+
|                  1|
+-------------------+

CPU times: user 2.43 ms, sys: 558 µs, total: 2.99 ms
Wall time: 1.1 s
+---+--------------------+
| id|multiply_func(id, 2)|
+---+--------------------+
|  0|                   0|
|  1|                   2|
|  2|                   4|
|  3|                   6|
|  4|                   8|
|  5|                  10|
|  6|                  12|
|  7|                  14|
|  8|                  16|
|  9|                  18|
+---+--------------------+

CPU times: user 3.48 ms, sys: 190 µs, total: 3.67 ms
Wall time: 1.12 s

Now running on 10000 rows
CPU times: user 39.6 ms, sys: 4.83 ms, total: 44.4 ms
Wall time: 1.39 s
In [ ]:
# What will change if I run on 10001 rows?

num_rows=10001
print(f"\nNow running on {num_rows} rows")
%time a=spark.sql(f"select multiply_func(id,2) from range({num_rows})").collect()
In [ ]:
# Pandas UDF with type hints for Python 3
# This is the recommended syntax introduced Spark 3

import time
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def multiply_func2(a: pd.Series, b: pd.Series) -> pd.Series:
  time.sleep(1)
  return a * b

spark.udf.register("multiply_func2", multiply_func)

# warmup
spark.sql("select multiply_func2(1,1)").show()
In [ ]:
# mapInPandas
# This allows to return a different number of rows than what was in the input
# while still using arrow serialization for performance
# Note: this uses the DataFrame API

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

df.show()

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
In [ ]:
# End the Spark application
spark.stop()
In [ ]: