{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## This notebook is part of the Apache Spark training delivered by CERN-IT\n",
"### Spark SQL Tutorial and Hands-On Lab\n",
"Contact: Luca.Canali@cern.ch"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Data exploration and data analysis using Spark SQL\n",
"*This demostrates how to use Spark SQL - filter, aggregates and joins, with some additional notes on advanced SQL and Python UDF*"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Run this notebook from Jupyter with Python kernel\n",
"- When using on CERN SWAN, do not attach the notebook to a Spark cluster, but rather run locally on the SWAN container\n",
"- If running this outside CERN SWAN, please make sure to have PySpark installed: `pip install pyspark`"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### First let's create a Spark Session"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": false
},
"outputs": [],
"source": [
"#\n",
"# Local mode: run this when using CERN SWAN not connected to a cluster \n",
"# or run it on a private Jupyter notebook instance\n",
"# Dependency: PySpark (use SWAN or pip install pyspark)\n",
"#\n",
"\n",
"# !pip install pyspark\n",
"\n",
"# Create Spark Session, you need this to work with Spark\n",
"from pyspark.sql import SparkSession\n",
"spark = (SparkSession.builder \n",
" .appName(\"my app\")\n",
" .master(\"local[1]\")\n",
" .config(\"spark.driver.memory\",\"1g\")\n",
" .config(\"spark.ui.showConsoleProgress\", \"false\")\n",
" .getOrCreate()\n",
" )\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"
\n",
"
SparkSession - in-memory
\n",
" \n",
"
\n",
"
SparkContext
\n",
"\n",
"
Spark UI
\n",
"\n",
"
\n",
" - Version
\n",
" v3.3.1
\n",
" - Master
\n",
" local[1]
\n",
" - AppName
\n",
" my app
\n",
"
\n",
"
\n",
" \n",
"
\n",
" "
],
"text/plain": [
""
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"spark"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Some basic Spark SQL to get started"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+\n",
"| id|\n",
"+---+\n",
"| 0|\n",
"| 1|\n",
"| 2|\n",
"| 3|\n",
"| 4|\n",
"| 5|\n",
"| 6|\n",
"| 7|\n",
"| 8|\n",
"| 9|\n",
"+---+\n",
"\n"
]
},
{
"data": {
"text/plain": [
"10"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# What you can do with DataFrame declarative API you can also do with Spark SQL. \n",
"# This uses the DataFrame API:\n",
"\n",
"df = spark.range(10)\n",
"df.show()\n",
"df.count()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"10"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# This uses Spark SQL\n",
"df2 = spark.sql(\"select id from range(10)\")\n",
"df2.count()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+\n",
"|count(1)|\n",
"+--------+\n",
"| 3|\n",
"+--------+\n",
"\n"
]
}
],
"source": [
"# From DataFrame to Spark SQL View\n",
"\n",
"df = spark.createDataFrame([(1, \"event1\"), (2,\"event2\"), (3, \"event3\")], (\"id\",\"name\"))\n",
"df.createOrReplaceTempView(\"myEvents\")\n",
"\n",
"spark.sql(\"select count(*) from myEvents\").show()"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"== Parsed Logical Plan ==\n",
"'Project [*]\n",
"+- 'Filter ('id = 1)\n",
" +- 'UnresolvedRelation [myEvents], [], false\n",
"\n",
"== Analyzed Logical Plan ==\n",
"id: bigint, name: string\n",
"Project [id#21L, name#22]\n",
"+- Filter (id#21L = cast(1 as bigint))\n",
" +- SubqueryAlias myevents\n",
" +- View (`myEvents`, [id#21L,name#22])\n",
" +- LogicalRDD [id#21L, name#22], false\n",
"\n",
"== Optimized Logical Plan ==\n",
"Filter (isnotnull(id#21L) AND (id#21L = 1))\n",
"+- LogicalRDD [id#21L, name#22], false\n",
"\n",
"== Physical Plan ==\n",
"*(1) Filter (isnotnull(id#21L) AND (id#21L = 1))\n",
"+- *(1) Scan ExistingRDD[id#21L,name#22]\n",
"\n"
]
}
],
"source": [
"# Execution plans\n",
"spark.sql(\"select * from myEvents where id=1\").explain(True)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Table(name='myevents', database=None, description=None, tableType='TEMPORARY', isTemporary=True),\n",
" Table(name='t1', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Another syntax to create test tables\n",
"df = spark.sql(\"select * from values (1, 'event1'), (2,'event2'), (3, 'event3') as (id,name)\")\n",
"\n",
"# this will cache the table, note that caching works lazily\n",
"# the table will be cached only after the first access, that's why we add df.count()\n",
"df.cache()\n",
"df.count()\n",
"\n",
"# this registers a temporary view called t1\n",
"df.createOrReplaceTempView(\"t1\")\n",
"\n",
"# list the tables and views in the catalog\n",
"spark.catalog.listTables()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- id: integer (nullable = false)\n",
" |-- name: string (nullable = false)\n",
"\n",
"+--------+---------+-------+\n",
"|col_name|data_type|comment|\n",
"+--------+---------+-------+\n",
"| id| int| null|\n",
"| name| string| null|\n",
"+--------+---------+-------+\n",
"\n"
]
}
],
"source": [
"# Show the table schema, with 2 different methods\n",
"\n",
"df.printSchema()\n",
"spark.sql(\"describe t1\").show()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+------+-----------+\n",
"| id| name|even_or_odd|\n",
"+---+------+-----------+\n",
"| 1|event1| 1|\n",
"| 2|event2| 0|\n",
"| 3|event3| 1|\n",
"+---+------+-----------+\n",
"\n"
]
}
],
"source": [
"# Basic select, list the columns you want to retrieve, can also add calculated columns\n",
"\n",
"spark.sql(\"select id, name, id % 2 even_or_odd from t1\").show()"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+------+-----------+\n",
"| id| name|even_or_odd|\n",
"+---+------+-----------+\n",
"| 1|event1| 1|\n",
"| 2|event2| 0|\n",
"+---+------+-----------+\n",
"\n"
]
}
],
"source": [
"# Selection with a filter clause\n",
"\n",
"spark.sql(\"select id, name, id % 2 even_or_odd from t1 where id < 3\").show()\n"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----------+---------+-------+\n",
"|Even_or_odd|N_samples|Sum_Ids|\n",
"+-----------+---------+-------+\n",
"| 1| 2| 4|\n",
"| 0| 1| 2|\n",
"+-----------+---------+-------+\n",
"\n"
]
}
],
"source": [
"# Aggregations with group by \n",
"\n",
"spark.sql(\"\"\"select id % 2 as Even_or_odd, count(*) N_samples, sum(id) Sum_Ids \n",
" from t1 \n",
" group by id % 2\"\"\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### SQL for data analysis"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"# Join example using a parent-child relationship\n",
"\n",
"# Create test tables\n",
"emp = spark.createDataFrame([(1, \"Emp1\", 10), (2,\"Emp2\", 10), (3, \"Emp3\", 20)], (\"id\",\"name\",\"dep_id\"))\n",
"emp.createOrReplaceTempView(\"employees\")\n",
"\n",
"dep = spark.createDataFrame([(10, \"Department1\"), (20, \"Department2\"), (30, \"Department3\")], (\"id\",\"name\"))\n",
"dep.createOrReplaceTempView(\"departments\")"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" emp_name | \n",
" dep_name | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
" Emp1 | \n",
" Department1 | \n",
"
\n",
" \n",
" 1 | \n",
" 2 | \n",
" Emp2 | \n",
" Department1 | \n",
"
\n",
" \n",
" 2 | \n",
" 3 | \n",
" Emp3 | \n",
" Department2 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id emp_name dep_name\n",
"0 1 Emp1 Department1\n",
"1 2 Emp2 Department1\n",
"2 3 Emp3 Department2"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Inner join\n",
"spark.sql(\"\"\"\n",
"select employees.id, employees.name emp_name, departments.name dep_name\n",
"from employees join departments\n",
"on employees.dep_id = departments.id\n",
"order by employees.id\"\"\").toPandas()"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" dep_name | \n",
" emp_name | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 10 | \n",
" Department1 | \n",
" Emp1 | \n",
"
\n",
" \n",
" 1 | \n",
" 10 | \n",
" Department1 | \n",
" Emp2 | \n",
"
\n",
" \n",
" 2 | \n",
" 20 | \n",
" Department2 | \n",
" Emp3 | \n",
"
\n",
" \n",
" 3 | \n",
" 30 | \n",
" Department3 | \n",
" None | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id dep_name emp_name\n",
"0 10 Department1 Emp1\n",
"1 10 Department1 Emp2\n",
"2 20 Department2 Emp3\n",
"3 30 Department3 None"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Outer join\n",
"spark.sql(\"\"\"\n",
"select departments.id, departments.name dep_name, employees.name emp_name\n",
"from departments left outer join employees\n",
"on employees.dep_id = departments.id\n",
"order by departments.id\"\"\").toPandas()"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2022-09-26 16:57:29,433 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.\n",
"2022-09-26 16:57:29,592 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.\n"
]
},
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" name | \n",
" dep_id | \n",
" greatest_id_same_department | \n",
" previous_employee_name | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
" Emp1 | \n",
" 10 | \n",
" 2 | \n",
" None | \n",
"
\n",
" \n",
" 1 | \n",
" 2 | \n",
" Emp2 | \n",
" 10 | \n",
" 2 | \n",
" Emp1 | \n",
"
\n",
" \n",
" 2 | \n",
" 3 | \n",
" Emp3 | \n",
" 20 | \n",
" 3 | \n",
" Emp2 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id name dep_id greatest_id_same_department previous_employee_name\n",
"0 1 Emp1 10 2 None\n",
"1 2 Emp2 10 2 Emp1\n",
"2 3 Emp3 20 3 Emp2"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Window function\n",
"spark.sql(\"\"\"\n",
"select id, name, dep_id,\n",
" max(id) over (partition by dep_id) as greatest_id_same_department,\n",
" lag(name) over (order by id) as previous_employee_name\n",
"from employees\n",
"order by id\n",
"\"\"\").toPandas()"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"# A helper \"magic function\" for running PySpark SQL in Jupyter Notebooks\n",
"\n",
"from IPython.core.magic import register_line_cell_magic\n",
"max_show_lines = 50 # Limit on the number of lines to show with %sql_show and %sql_display\n",
"@register_line_cell_magic\n",
"def sql_display(line, cell=None):\n",
" \"\"\"Execute sql and convert results to Pandas DataFrame for pretty display or further processing.\n",
" Use: %sql_display or %%sql_display\"\"\"\n",
" val = cell if cell is not None else line \n",
" return spark.sql(val).limit(max_show_lines).toPandas() "
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" Even_or_odd | \n",
" N_samples | \n",
" Sum_Ids | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
" 2 | \n",
" 4 | \n",
"
\n",
" \n",
" 1 | \n",
" 0 | \n",
" 1 | \n",
" 2 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" Even_or_odd N_samples Sum_Ids\n",
"0 1 2 4\n",
"1 0 1 2"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%sql_display\n",
"\n",
"select id % 2 as Even_or_odd, count(*) N_samples, sum(id) Sum_Ids \n",
"from t1 \n",
"group by id % 2"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" FizzBuzz | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
" 1 | \n",
"
\n",
" \n",
" 1 | \n",
" 2 | \n",
" 2 | \n",
"
\n",
" \n",
" 2 | \n",
" 3 | \n",
" Fizz | \n",
"
\n",
" \n",
" 3 | \n",
" 4 | \n",
" 4 | \n",
"
\n",
" \n",
" 4 | \n",
" 5 | \n",
" Buzz | \n",
"
\n",
" \n",
" 5 | \n",
" 6 | \n",
" Fizz | \n",
"
\n",
" \n",
" 6 | \n",
" 7 | \n",
" 7 | \n",
"
\n",
" \n",
" 7 | \n",
" 8 | \n",
" 8 | \n",
"
\n",
" \n",
" 8 | \n",
" 9 | \n",
" Fizz | \n",
"
\n",
" \n",
" 9 | \n",
" 10 | \n",
" Buzz | \n",
"
\n",
" \n",
" 10 | \n",
" 11 | \n",
" 11 | \n",
"
\n",
" \n",
" 11 | \n",
" 12 | \n",
" Fizz | \n",
"
\n",
" \n",
" 12 | \n",
" 13 | \n",
" 13 | \n",
"
\n",
" \n",
" 13 | \n",
" 14 | \n",
" 14 | \n",
"
\n",
" \n",
" 14 | \n",
" 15 | \n",
" FizzBuzz | \n",
"
\n",
" \n",
" 15 | \n",
" 16 | \n",
" 16 | \n",
"
\n",
" \n",
" 16 | \n",
" 17 | \n",
" 17 | \n",
"
\n",
" \n",
" 17 | \n",
" 18 | \n",
" Fizz | \n",
"
\n",
" \n",
" 18 | \n",
" 19 | \n",
" 19 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id FizzBuzz\n",
"0 1 1\n",
"1 2 2\n",
"2 3 Fizz\n",
"3 4 4\n",
"4 5 Buzz\n",
"5 6 Fizz\n",
"6 7 7\n",
"7 8 8\n",
"8 9 Fizz\n",
"9 10 Buzz\n",
"10 11 11\n",
"11 12 Fizz\n",
"12 13 13\n",
"13 14 14\n",
"14 15 FizzBuzz\n",
"15 16 16\n",
"16 17 17\n",
"17 18 Fizz\n",
"18 19 19"
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%sql_display\n",
"\n",
"-- Fun with SQL: FizzBuzz, see https://en.wikipedia.org/wiki/Fizz_buzz\n",
" \n",
"select id, case\n",
" when id % 15 = 0 then 'FizzBuzz'\n",
" when id % 3 = 0 then 'Fizz'\n",
" when id % 5 = 0 then 'Buzz'\n",
" else cast(id as string)\n",
" end as FizzBuzz\n",
"from range(1,20)\n",
"order by id"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+--------+\n",
"| id|FizzBuzz|\n",
"+---+--------+\n",
"| 1| 1|\n",
"| 2| 2|\n",
"| 3| Fizz|\n",
"| 4| 4|\n",
"| 5| Buzz|\n",
"| 6| Fizz|\n",
"| 7| 7|\n",
"| 8| 8|\n",
"| 9| Fizz|\n",
"| 10| Buzz|\n",
"| 11| 11|\n",
"| 12| Fizz|\n",
"| 13| 13|\n",
"| 14| 14|\n",
"| 15|FizzBuzz|\n",
"| 16| 16|\n",
"| 17| 17|\n",
"| 18| Fizz|\n",
"| 19| 19|\n",
"+---+--------+\n",
"\n"
]
}
],
"source": [
"# this is the standard way to run Spark sql with PySpark\n",
"# Fun with SQL: FizzBuzz, see https://en.wikipedia.org/wiki/Fizz_buzz\n",
"\n",
"spark.sql(\"\"\"\n",
"select id, case\n",
" when id % 15 = 0 then 'FizzBuzz'\n",
" when id % 3 = 0 then 'Fizz'\n",
" when id % 5 = 0 then 'Buzz'\n",
" else cast(id as string)\n",
" end as FizzBuzz\n",
"from range(1,20)\n",
"order by id\"\"\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Processing nested data with arrays, structs and maps"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+----------------------------+\n",
"|id |temp_celsius |\n",
"+---+----------------------------+\n",
"|1 |[35, 36, 32, 30, 40, 42, 38]|\n",
"|2 |[31, 32, 34, 55, 56] |\n",
"+---+----------------------------+\n",
"\n"
]
}
],
"source": [
"# Prepare test data with arrays\n",
"# readings from a sensor measuring temperature in Celsius\n",
"\n",
"schema = \"id INT, temp_celsius ARRAY\"\n",
"\n",
"t_list = [1,[35, 36, 32, 30, 40, 42, 38]], [2,[31, 32, 34, 55, 56]]\n",
"\n",
"spark.createDataFrame(t_list, schema).createOrReplaceTempView(\"temp_data\")\n",
"\n",
"spark.sql(\"select * from temp_data\").show(10,False)"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" first_temp_reading | \n",
" max_reading | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 35 | \n",
" 42 | \n",
"
\n",
" \n",
" 1 | \n",
" 31 | \n",
" 56 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" first_temp_reading max_reading\n",
"0 35 42\n",
"1 31 56"
]
},
"execution_count": 22,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Example of array functions\n",
"# Take first temperature reading and the max temperature reading\n",
"\n",
"spark.sql(\"select temp_celsius[0] first_temp_reading, array_max(temp_celsius) max_reading from temp_data\").toPandas()"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {
"scrolled": true
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" temp_celsius | \n",
" temp_fahrenheit | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
" [35, 36, 32, 30, 40, 42, 38] | \n",
" [95.0, 96.8, 89.6, 86.0, 104.0, 107.6, 100.4] | \n",
"
\n",
" \n",
" 1 | \n",
" 2 | \n",
" [31, 32, 34, 55, 56] | \n",
" [87.8, 89.6, 93.2, 131.0, 132.8] | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id temp_celsius \\\n",
"0 1 [35, 36, 32, 30, 40, 42, 38] \n",
"1 2 [31, 32, 34, 55, 56] \n",
"\n",
" temp_fahrenheit \n",
"0 [95.0, 96.8, 89.6, 86.0, 104.0, 107.6, 100.4] \n",
"1 [87.8, 89.6, 93.2, 131.0, 132.8] "
]
},
"execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Array procesing with Spark using \"higher order functions\" in SQL\n",
"# Compute conversion from Fahrenheit from Celsius for an array of temperatures\n",
"\n",
"spark.sql(\"\"\"\n",
"SELECT id, temp_celsius, \n",
" transform(temp_celsius, t -> ((t * 9) / 5) + 32) as temp_fahrenheit \n",
" FROM temp_data\n",
"\"\"\").toPandas()"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+----------------------------+--------+\n",
"|id |temp_celsius |high |\n",
"+---+----------------------------+--------+\n",
"|1 |[35, 36, 32, 30, 40, 42, 38]|[40, 42]|\n",
"|2 |[31, 32, 34, 55, 56] |[55, 56]|\n",
"+---+----------------------------+--------+\n",
"\n"
]
}
],
"source": [
"# Array procesing using Spark higher order functions in SQL\n",
"# Filter temperatures > 38C from an array of temperature values\n",
"\n",
"spark.sql(\"\"\"\n",
"SELECT id, temp_celsius, filter(temp_celsius, t -> t > 38) as high \n",
"FROM temp_data\n",
"\"\"\").show(10,False)"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+-----------------+\n",
"|id |collect_list(val)|\n",
"+---+-----------------+\n",
"|1 |[40, 42] |\n",
"|2 |[55, 56] |\n",
"+---+-----------------+\n",
"\n"
]
}
],
"source": [
"# This demonstrates using the \"legacy\" SQL functions explode and collect_list \n",
"# the performance is suboptimal, especially for large arrays\n",
"# also quite hard to read\n",
"\n",
"spark.sql(\"\"\"\n",
"with exploded_data as (\n",
" select id, explode(temp_celsius) val from temp_data\n",
")\n",
"select id, collect_list(val) from exploded_data where val > 38 group by id\n",
"\"\"\").show(10,False)"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+-------------------+\n",
"| id|average_temperature|\n",
"+---+-------------------+\n",
"| 1| 36.1|\n",
"| 2| 41.6|\n",
"+---+-------------------+\n",
"\n"
]
}
],
"source": [
"# Example of array functions, aggregate (higher orger function) and cardinality\n",
"\n",
"# - aggregate(expr, start, merge, finish) - Applies a binary operator to an initial state and all elements in the array, \n",
"# and reduces this to a single state. The final state is converted into the final result by applying a finish function.\n",
"# - cardinality(expr) - Returns the size of an array or a map. \n",
"\n",
"spark.sql(\"\"\"\n",
" SELECT id, aggregate(temp_celsius, 0, (acc, x) -> acc + x,\n",
" acc -> round(acc / cardinality(temp_celsius),1)) as average_temperature\n",
" from temp_data\"\"\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Structs and arrays"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- event: long (nullable = true)\n",
" |-- HLT: struct (nullable = true)\n",
" | |-- flag1: boolean (nullable = true)\n",
" | |-- flag2: boolean (nullable = true)\n",
" |-- muons: array (nullable = true)\n",
" | |-- element: struct (containsNull = true)\n",
" | | |-- pt: float (nullable = true)\n",
" | | |-- eta: float (nullable = true)\n",
" | | |-- mass: float (nullable = true)\n",
"\n",
"+-----+------------+----------------------------------+\n",
"|event|HLT |muons |\n",
"+-----+------------+----------------------------------+\n",
"|1000 |{true, true}|[{1.1, 2.2, 1.0}, {1.2, 2.2, 1.0}]|\n",
"|1001 |{true, true}|[{1.2, 2.3, 1.0}, {1.3, 2.3, 1.0}]|\n",
"+-----+------------+----------------------------------+\n",
"\n"
]
}
],
"source": [
"# Example with structs and arrays\n",
"# Inspired from physics datasets\n",
"\n",
"schema = \"event LONG, HLT struct, muons ARRAY>\"\n",
"\n",
"t_list = [[1000, [True,True] , [[1.1,2.2,1.0], [1.2,2.2,1.0]]], [1001, [True,True], [[1.2,2.3, 1.0],[1.3,2.3, 1.0]]]]\n",
"\n",
"df = spark.createDataFrame(t_list, schema)\n",
"df.printSchema()\n",
"\n",
"df.createOrReplaceTempView(\"particles\")\n",
"\n",
"spark.sql(\"select * from particles\").show(10,False)"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+---------------+\n",
"|event|flag1|muons[0] |\n",
"+-----+-----+---------------+\n",
"|1000 |true |{1.1, 2.2, 1.0}|\n",
"|1001 |true |{1.2, 2.3, 1.0}|\n",
"+-----+-----+---------------+\n",
"\n"
]
}
],
"source": [
"# display only flag1 and the first muon in the list\n",
"\n",
"spark.sql(\"select event, HLT.flag1, muons[0] from particles\").show(10,False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Use maps for storing key-value pairs"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- id: integer (nullable = true)\n",
" |-- myKeyValues: map (nullable = true)\n",
" | |-- key: integer\n",
" | |-- value: integer (valueContainsNull = true)\n",
"\n",
"+----+---------------------------+\n",
"|id |myKeyValues |\n",
"+----+---------------------------+\n",
"|1000|{1 -> 1, 2 -> 2} |\n",
"|1001|{1 -> 10, 2 -> 11, 3 -> 12}|\n",
"+----+---------------------------+\n",
"\n"
]
}
],
"source": [
"schema = \"id INT, myKeyValues MAP\"\n",
"\n",
"t_list = [[1000, {1:1, 2:2}], [1001, {1:10, 2:11, 3:12}]]\n",
"df = spark.createDataFrame(t_list, schema)\n",
"df.createOrReplaceTempView(\"t1_map\")\n",
"\n",
"df.printSchema()\n",
"df.show(10,False)"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------------------------+\n",
"|id |mymap_transformed |\n",
"+----+---------------------------+\n",
"|1000|{1 -> 2, 2 -> 4} |\n",
"|1001|{1 -> 11, 2 -> 13, 3 -> 15}|\n",
"+----+---------------------------+\n",
"\n"
]
}
],
"source": [
"spark.sql(\"SELECT id, transform_values(myKeyValues, (k, v) -> k + v) as mymap_transformed from t1_map\").show(10, False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Python User Defined Functions (UDF)"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+\n",
"|slowf(1)|\n",
"+--------+\n",
"| 2|\n",
"+--------+\n",
"\n",
"+--------+\n",
"|slowf(1)|\n",
"+--------+\n",
"| 2|\n",
"+--------+\n",
"\n",
"CPU times: user 2.72 ms, sys: 1.08 ms, total: 3.8 ms\n",
"Wall time: 1.12 s\n",
"+---+---------+\n",
"| id|slowf(id)|\n",
"+---+---------+\n",
"| 0| 0|\n",
"| 1| 2|\n",
"| 2| 4|\n",
"| 3| 6|\n",
"| 4| 8|\n",
"| 5| 10|\n",
"| 6| 12|\n",
"| 7| 14|\n",
"| 8| 16|\n",
"| 9| 18|\n",
"+---+---------+\n",
"\n",
"CPU times: user 3.33 ms, sys: 1.56 ms, total: 4.89 ms\n",
"Wall time: 10.2 s\n"
]
}
],
"source": [
"# Basic Python UDF example\n",
"\n",
"import time\n",
"def slowf(s):\n",
" time.sleep(1) # this waits for 1 second\n",
" return 2*s\n",
"\n",
"spark.udf.register(\"slowf\", slowf)\n",
"\n",
"# warmup\n",
"spark.sql(\"select slowf(1)\").show()\n",
"\n",
"%time spark.sql(\"select slowf(1)\").show()\n",
"\n",
"%time spark.sql(\"select id, slowf(id) from range(10)\").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Pandas UDF - improves on Python UDF"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {
"scrolled": false
},
"outputs": [
{
"data": {
"text/plain": [
""
]
},
"execution_count": 32,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Python Pandas UDF example\n",
"# Pandas UDF are a faster implementation for Python UDF\n",
"\n",
"import time\n",
"import pandas as pd\n",
"from pyspark.sql.functions import pandas_udf\n",
"\n",
"@pandas_udf('long')\n",
"def multiply_func(a,b):\n",
" time.sleep(1)\n",
" return a * b\n",
"\n",
"spark.udf.register(\"multiply_func\", multiply_func)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------------------+\n",
"|multiply_func(1, 1)|\n",
"+-------------------+\n",
"| 1|\n",
"+-------------------+\n",
"\n",
"+-------------------+\n",
"|multiply_func(1, 1)|\n",
"+-------------------+\n",
"| 1|\n",
"+-------------------+\n",
"\n",
"CPU times: user 2.43 ms, sys: 558 µs, total: 2.99 ms\n",
"Wall time: 1.1 s\n",
"+---+--------------------+\n",
"| id|multiply_func(id, 2)|\n",
"+---+--------------------+\n",
"| 0| 0|\n",
"| 1| 2|\n",
"| 2| 4|\n",
"| 3| 6|\n",
"| 4| 8|\n",
"| 5| 10|\n",
"| 6| 12|\n",
"| 7| 14|\n",
"| 8| 16|\n",
"| 9| 18|\n",
"+---+--------------------+\n",
"\n",
"CPU times: user 3.48 ms, sys: 190 µs, total: 3.67 ms\n",
"Wall time: 1.12 s\n",
"\n",
"Now running on 10000 rows\n",
"CPU times: user 39.6 ms, sys: 4.83 ms, total: 44.4 ms\n",
"Wall time: 1.39 s\n"
]
}
],
"source": [
"# Can you guess how long will it take to run the udf with 1 row, 10 rows and 10000 rows?\n",
"# Run the followin cell to see the answer\n",
"\n",
"# warmup\n",
"spark.sql(\"select multiply_func(1,1)\").show()\n",
"\n",
"%time spark.sql(\"select multiply_func(1,1)\").show()\n",
"\n",
"%time spark.sql(\"select id, multiply_func(id,2) from range(10)\").show()\n",
"\n",
"num_rows=10000\n",
"print(f\"\\nNow running on {num_rows} rows\")\n",
"%time a=spark.sql(f\"select multiply_func(id,2) from range({num_rows})\").collect()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# What will change if I run on 10001 rows?\n",
"\n",
"num_rows=10001\n",
"print(f\"\\nNow running on {num_rows} rows\")\n",
"%time a=spark.sql(f\"select multiply_func(id,2) from range({num_rows})\").collect()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Pandas UDF with type hints for Python 3\n",
"# This is the recommended syntax introduced Spark 3\n",
"\n",
"import time\n",
"import pandas as pd\n",
"from pyspark.sql.functions import pandas_udf\n",
"\n",
"@pandas_udf('long')\n",
"def multiply_func2(a: pd.Series, b: pd.Series) -> pd.Series:\n",
" time.sleep(1)\n",
" return a * b\n",
"\n",
"spark.udf.register(\"multiply_func2\", multiply_func)\n",
"\n",
"# warmup\n",
"spark.sql(\"select multiply_func2(1,1)\").show()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# mapInPandas\n",
"# This allows to return a different number of rows than what was in the input\n",
"# while still using arrow serialization for performance\n",
"# Note: this uses the DataFrame API\n",
"\n",
"df = spark.createDataFrame([(1, 21), (2, 30)], (\"id\", \"age\"))\n",
"\n",
"df.show()\n",
"\n",
"def filter_func(iterator):\n",
" for pdf in iterator:\n",
" yield pdf[pdf.id == 1]\n",
"\n",
"df.mapInPandas(filter_func, schema=df.schema).show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# End the Spark application\n",
"spark.stop()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.13"
}
},
"nbformat": 4,
"nbformat_minor": 1
}