{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## This notebook is part of the Apache Spark training delivered by CERN-IT\n", "### Spark SQL Tutorial and Hands-On Lab\n", "Contact: Luca.Canali@cern.ch" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Data exploration and data analysis using Spark SQL\n", "*This demostrates how to use Spark SQL - filter, aggregates and joins, with some additional notes on advanced SQL and Python UDF*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run this notebook from Jupyter with Python kernel\n", "- When using on CERN SWAN, do not attach the notebook to a Spark cluster, but rather run locally on the SWAN container\n", "- If running this outside CERN SWAN, please make sure to have PySpark installed: `pip install pyspark`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### First let's create a Spark Session" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "#\n", "# Local mode: run this when using CERN SWAN not connected to a cluster \n", "# or run it on a private Jupyter notebook instance\n", "# Dependency: PySpark (use SWAN or pip install pyspark)\n", "#\n", "\n", "# !pip install pyspark\n", "\n", "# Create Spark Session, you need this to work with Spark\n", "from pyspark.sql import SparkSession\n", "spark = (SparkSession.builder \n", " .appName(\"my app\")\n", " .master(\"local[1]\")\n", " .config(\"spark.driver.memory\",\"1g\")\n", " .config(\"spark.ui.showConsoleProgress\", \"false\")\n", " .getOrCreate()\n", " )\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "

SparkSession - in-memory

\n", " \n", "
\n", "

SparkContext

\n", "\n", "

Spark UI

\n", "\n", "
\n", "
Version
\n", "
v3.3.1
\n", "
Master
\n", "
local[1]
\n", "
AppName
\n", "
my app
\n", "
\n", "
\n", " \n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Some basic Spark SQL to get started" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+\n", "| id|\n", "+---+\n", "| 0|\n", "| 1|\n", "| 2|\n", "| 3|\n", "| 4|\n", "| 5|\n", "| 6|\n", "| 7|\n", "| 8|\n", "| 9|\n", "+---+\n", "\n" ] }, { "data": { "text/plain": [ "10" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# What you can do with DataFrame declarative API you can also do with Spark SQL. \n", "# This uses the DataFrame API:\n", "\n", "df = spark.range(10)\n", "df.show()\n", "df.count()" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "10" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# This uses Spark SQL\n", "df2 = spark.sql(\"select id from range(10)\")\n", "df2.count()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+\n", "|count(1)|\n", "+--------+\n", "| 3|\n", "+--------+\n", "\n" ] } ], "source": [ "# From DataFrame to Spark SQL View\n", "\n", "df = spark.createDataFrame([(1, \"event1\"), (2,\"event2\"), (3, \"event3\")], (\"id\",\"name\"))\n", "df.createOrReplaceTempView(\"myEvents\")\n", "\n", "spark.sql(\"select count(*) from myEvents\").show()" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "== Parsed Logical Plan ==\n", "'Project [*]\n", "+- 'Filter ('id = 1)\n", " +- 'UnresolvedRelation [myEvents], [], false\n", "\n", "== Analyzed Logical Plan ==\n", "id: bigint, name: string\n", "Project [id#21L, name#22]\n", "+- Filter (id#21L = cast(1 as bigint))\n", " +- SubqueryAlias myevents\n", " +- View (`myEvents`, [id#21L,name#22])\n", " +- LogicalRDD [id#21L, name#22], false\n", "\n", "== Optimized Logical Plan ==\n", "Filter (isnotnull(id#21L) AND (id#21L = 1))\n", "+- LogicalRDD [id#21L, name#22], false\n", "\n", "== Physical Plan ==\n", "*(1) Filter (isnotnull(id#21L) AND (id#21L = 1))\n", "+- *(1) Scan ExistingRDD[id#21L,name#22]\n", "\n" ] } ], "source": [ "# Execution plans\n", "spark.sql(\"select * from myEvents where id=1\").explain(True)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Table(name='myevents', database=None, description=None, tableType='TEMPORARY', isTemporary=True),\n", " Table(name='t1', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Another syntax to create test tables\n", "df = spark.sql(\"select * from values (1, 'event1'), (2,'event2'), (3, 'event3') as (id,name)\")\n", "\n", "# this will cache the table, note that caching works lazily\n", "# the table will be cached only after the first access, that's why we add df.count()\n", "df.cache()\n", "df.count()\n", "\n", "# this registers a temporary view called t1\n", "df.createOrReplaceTempView(\"t1\")\n", "\n", "# list the tables and views in the catalog\n", "spark.catalog.listTables()" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- id: integer (nullable = false)\n", " |-- name: string (nullable = false)\n", "\n", "+--------+---------+-------+\n", "|col_name|data_type|comment|\n", "+--------+---------+-------+\n", "| id| int| null|\n", "| name| string| null|\n", "+--------+---------+-------+\n", "\n" ] } ], "source": [ "# Show the table schema, with 2 different methods\n", "\n", "df.printSchema()\n", "spark.sql(\"describe t1\").show()" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+------+-----------+\n", "| id| name|even_or_odd|\n", "+---+------+-----------+\n", "| 1|event1| 1|\n", "| 2|event2| 0|\n", "| 3|event3| 1|\n", "+---+------+-----------+\n", "\n" ] } ], "source": [ "# Basic select, list the columns you want to retrieve, can also add calculated columns\n", "\n", "spark.sql(\"select id, name, id % 2 even_or_odd from t1\").show()" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+------+-----------+\n", "| id| name|even_or_odd|\n", "+---+------+-----------+\n", "| 1|event1| 1|\n", "| 2|event2| 0|\n", "+---+------+-----------+\n", "\n" ] } ], "source": [ "# Selection with a filter clause\n", "\n", "spark.sql(\"select id, name, id % 2 even_or_odd from t1 where id < 3\").show()\n" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----------+---------+-------+\n", "|Even_or_odd|N_samples|Sum_Ids|\n", "+-----------+---------+-------+\n", "| 1| 2| 4|\n", "| 0| 1| 2|\n", "+-----------+---------+-------+\n", "\n" ] } ], "source": [ "# Aggregations with group by \n", "\n", "spark.sql(\"\"\"select id % 2 as Even_or_odd, count(*) N_samples, sum(id) Sum_Ids \n", " from t1 \n", " group by id % 2\"\"\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### SQL for data analysis" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "# Join example using a parent-child relationship\n", "\n", "# Create test tables\n", "emp = spark.createDataFrame([(1, \"Emp1\", 10), (2,\"Emp2\", 10), (3, \"Emp3\", 20)], (\"id\",\"name\",\"dep_id\"))\n", "emp.createOrReplaceTempView(\"employees\")\n", "\n", "dep = spark.createDataFrame([(10, \"Department1\"), (20, \"Department2\"), (30, \"Department3\")], (\"id\",\"name\"))\n", "dep.createOrReplaceTempView(\"departments\")" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idemp_namedep_name
01Emp1Department1
12Emp2Department1
23Emp3Department2
\n", "
" ], "text/plain": [ " id emp_name dep_name\n", "0 1 Emp1 Department1\n", "1 2 Emp2 Department1\n", "2 3 Emp3 Department2" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Inner join\n", "spark.sql(\"\"\"\n", "select employees.id, employees.name emp_name, departments.name dep_name\n", "from employees join departments\n", "on employees.dep_id = departments.id\n", "order by employees.id\"\"\").toPandas()" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
iddep_nameemp_name
010Department1Emp1
110Department1Emp2
220Department2Emp3
330Department3None
\n", "
" ], "text/plain": [ " id dep_name emp_name\n", "0 10 Department1 Emp1\n", "1 10 Department1 Emp2\n", "2 20 Department2 Emp3\n", "3 30 Department3 None" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Outer join\n", "spark.sql(\"\"\"\n", "select departments.id, departments.name dep_name, employees.name emp_name\n", "from departments left outer join employees\n", "on employees.dep_id = departments.id\n", "order by departments.id\"\"\").toPandas()" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2022-09-26 16:57:29,433 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.\n", "2022-09-26 16:57:29,592 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamedep_idgreatest_id_same_departmentprevious_employee_name
01Emp1102None
12Emp2102Emp1
23Emp3203Emp2
\n", "
" ], "text/plain": [ " id name dep_id greatest_id_same_department previous_employee_name\n", "0 1 Emp1 10 2 None\n", "1 2 Emp2 10 2 Emp1\n", "2 3 Emp3 20 3 Emp2" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Window function\n", "spark.sql(\"\"\"\n", "select id, name, dep_id,\n", " max(id) over (partition by dep_id) as greatest_id_same_department,\n", " lag(name) over (order by id) as previous_employee_name\n", "from employees\n", "order by id\n", "\"\"\").toPandas()" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "# A helper \"magic function\" for running PySpark SQL in Jupyter Notebooks\n", "\n", "from IPython.core.magic import register_line_cell_magic\n", "max_show_lines = 50 # Limit on the number of lines to show with %sql_show and %sql_display\n", "@register_line_cell_magic\n", "def sql_display(line, cell=None):\n", " \"\"\"Execute sql and convert results to Pandas DataFrame for pretty display or further processing.\n", " Use: %sql_display or %%sql_display\"\"\"\n", " val = cell if cell is not None else line \n", " return spark.sql(val).limit(max_show_lines).toPandas() " ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Even_or_oddN_samplesSum_Ids
0124
1012
\n", "
" ], "text/plain": [ " Even_or_odd N_samples Sum_Ids\n", "0 1 2 4\n", "1 0 1 2" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%sql_display\n", "\n", "select id % 2 as Even_or_odd, count(*) N_samples, sum(id) Sum_Ids \n", "from t1 \n", "group by id % 2" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idFizzBuzz
011
122
23Fizz
344
45Buzz
56Fizz
677
788
89Fizz
910Buzz
101111
1112Fizz
121313
131414
1415FizzBuzz
151616
161717
1718Fizz
181919
\n", "
" ], "text/plain": [ " id FizzBuzz\n", "0 1 1\n", "1 2 2\n", "2 3 Fizz\n", "3 4 4\n", "4 5 Buzz\n", "5 6 Fizz\n", "6 7 7\n", "7 8 8\n", "8 9 Fizz\n", "9 10 Buzz\n", "10 11 11\n", "11 12 Fizz\n", "12 13 13\n", "13 14 14\n", "14 15 FizzBuzz\n", "15 16 16\n", "16 17 17\n", "17 18 Fizz\n", "18 19 19" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%sql_display\n", "\n", "-- Fun with SQL: FizzBuzz, see https://en.wikipedia.org/wiki/Fizz_buzz\n", " \n", "select id, case\n", " when id % 15 = 0 then 'FizzBuzz'\n", " when id % 3 = 0 then 'Fizz'\n", " when id % 5 = 0 then 'Buzz'\n", " else cast(id as string)\n", " end as FizzBuzz\n", "from range(1,20)\n", "order by id" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+--------+\n", "| id|FizzBuzz|\n", "+---+--------+\n", "| 1| 1|\n", "| 2| 2|\n", "| 3| Fizz|\n", "| 4| 4|\n", "| 5| Buzz|\n", "| 6| Fizz|\n", "| 7| 7|\n", "| 8| 8|\n", "| 9| Fizz|\n", "| 10| Buzz|\n", "| 11| 11|\n", "| 12| Fizz|\n", "| 13| 13|\n", "| 14| 14|\n", "| 15|FizzBuzz|\n", "| 16| 16|\n", "| 17| 17|\n", "| 18| Fizz|\n", "| 19| 19|\n", "+---+--------+\n", "\n" ] } ], "source": [ "# this is the standard way to run Spark sql with PySpark\n", "# Fun with SQL: FizzBuzz, see https://en.wikipedia.org/wiki/Fizz_buzz\n", "\n", "spark.sql(\"\"\"\n", "select id, case\n", " when id % 15 = 0 then 'FizzBuzz'\n", " when id % 3 = 0 then 'Fizz'\n", " when id % 5 = 0 then 'Buzz'\n", " else cast(id as string)\n", " end as FizzBuzz\n", "from range(1,20)\n", "order by id\"\"\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Processing nested data with arrays, structs and maps" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----------------------------+\n", "|id |temp_celsius |\n", "+---+----------------------------+\n", "|1 |[35, 36, 32, 30, 40, 42, 38]|\n", "|2 |[31, 32, 34, 55, 56] |\n", "+---+----------------------------+\n", "\n" ] } ], "source": [ "# Prepare test data with arrays\n", "# readings from a sensor measuring temperature in Celsius\n", "\n", "schema = \"id INT, temp_celsius ARRAY\"\n", "\n", "t_list = [1,[35, 36, 32, 30, 40, 42, 38]], [2,[31, 32, 34, 55, 56]]\n", "\n", "spark.createDataFrame(t_list, schema).createOrReplaceTempView(\"temp_data\")\n", "\n", "spark.sql(\"select * from temp_data\").show(10,False)" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
first_temp_readingmax_reading
03542
13156
\n", "
" ], "text/plain": [ " first_temp_reading max_reading\n", "0 35 42\n", "1 31 56" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Example of array functions\n", "# Take first temperature reading and the max temperature reading\n", "\n", "spark.sql(\"select temp_celsius[0] first_temp_reading, array_max(temp_celsius) max_reading from temp_data\").toPandas()" ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idtemp_celsiustemp_fahrenheit
01[35, 36, 32, 30, 40, 42, 38][95.0, 96.8, 89.6, 86.0, 104.0, 107.6, 100.4]
12[31, 32, 34, 55, 56][87.8, 89.6, 93.2, 131.0, 132.8]
\n", "
" ], "text/plain": [ " id temp_celsius \\\n", "0 1 [35, 36, 32, 30, 40, 42, 38] \n", "1 2 [31, 32, 34, 55, 56] \n", "\n", " temp_fahrenheit \n", "0 [95.0, 96.8, 89.6, 86.0, 104.0, 107.6, 100.4] \n", "1 [87.8, 89.6, 93.2, 131.0, 132.8] " ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Array procesing with Spark using \"higher order functions\" in SQL\n", "# Compute conversion from Fahrenheit from Celsius for an array of temperatures\n", "\n", "spark.sql(\"\"\"\n", "SELECT id, temp_celsius, \n", " transform(temp_celsius, t -> ((t * 9) / 5) + 32) as temp_fahrenheit \n", " FROM temp_data\n", "\"\"\").toPandas()" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----------------------------+--------+\n", "|id |temp_celsius |high |\n", "+---+----------------------------+--------+\n", "|1 |[35, 36, 32, 30, 40, 42, 38]|[40, 42]|\n", "|2 |[31, 32, 34, 55, 56] |[55, 56]|\n", "+---+----------------------------+--------+\n", "\n" ] } ], "source": [ "# Array procesing using Spark higher order functions in SQL\n", "# Filter temperatures > 38C from an array of temperature values\n", "\n", "spark.sql(\"\"\"\n", "SELECT id, temp_celsius, filter(temp_celsius, t -> t > 38) as high \n", "FROM temp_data\n", "\"\"\").show(10,False)" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-----------------+\n", "|id |collect_list(val)|\n", "+---+-----------------+\n", "|1 |[40, 42] |\n", "|2 |[55, 56] |\n", "+---+-----------------+\n", "\n" ] } ], "source": [ "# This demonstrates using the \"legacy\" SQL functions explode and collect_list \n", "# the performance is suboptimal, especially for large arrays\n", "# also quite hard to read\n", "\n", "spark.sql(\"\"\"\n", "with exploded_data as (\n", " select id, explode(temp_celsius) val from temp_data\n", ")\n", "select id, collect_list(val) from exploded_data where val > 38 group by id\n", "\"\"\").show(10,False)" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-------------------+\n", "| id|average_temperature|\n", "+---+-------------------+\n", "| 1| 36.1|\n", "| 2| 41.6|\n", "+---+-------------------+\n", "\n" ] } ], "source": [ "# Example of array functions, aggregate (higher orger function) and cardinality\n", "\n", "# - aggregate(expr, start, merge, finish) - Applies a binary operator to an initial state and all elements in the array, \n", "# and reduces this to a single state. The final state is converted into the final result by applying a finish function.\n", "# - cardinality(expr) - Returns the size of an array or a map. \n", "\n", "spark.sql(\"\"\"\n", " SELECT id, aggregate(temp_celsius, 0, (acc, x) -> acc + x,\n", " acc -> round(acc / cardinality(temp_celsius),1)) as average_temperature\n", " from temp_data\"\"\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Structs and arrays" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- event: long (nullable = true)\n", " |-- HLT: struct (nullable = true)\n", " | |-- flag1: boolean (nullable = true)\n", " | |-- flag2: boolean (nullable = true)\n", " |-- muons: array (nullable = true)\n", " | |-- element: struct (containsNull = true)\n", " | | |-- pt: float (nullable = true)\n", " | | |-- eta: float (nullable = true)\n", " | | |-- mass: float (nullable = true)\n", "\n", "+-----+------------+----------------------------------+\n", "|event|HLT |muons |\n", "+-----+------------+----------------------------------+\n", "|1000 |{true, true}|[{1.1, 2.2, 1.0}, {1.2, 2.2, 1.0}]|\n", "|1001 |{true, true}|[{1.2, 2.3, 1.0}, {1.3, 2.3, 1.0}]|\n", "+-----+------------+----------------------------------+\n", "\n" ] } ], "source": [ "# Example with structs and arrays\n", "# Inspired from physics datasets\n", "\n", "schema = \"event LONG, HLT struct, muons ARRAY>\"\n", "\n", "t_list = [[1000, [True,True] , [[1.1,2.2,1.0], [1.2,2.2,1.0]]], [1001, [True,True], [[1.2,2.3, 1.0],[1.3,2.3, 1.0]]]]\n", "\n", "df = spark.createDataFrame(t_list, schema)\n", "df.printSchema()\n", "\n", "df.createOrReplaceTempView(\"particles\")\n", "\n", "spark.sql(\"select * from particles\").show(10,False)" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+-----+---------------+\n", "|event|flag1|muons[0] |\n", "+-----+-----+---------------+\n", "|1000 |true |{1.1, 2.2, 1.0}|\n", "|1001 |true |{1.2, 2.3, 1.0}|\n", "+-----+-----+---------------+\n", "\n" ] } ], "source": [ "# display only flag1 and the first muon in the list\n", "\n", "spark.sql(\"select event, HLT.flag1, muons[0] from particles\").show(10,False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Use maps for storing key-value pairs" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- id: integer (nullable = true)\n", " |-- myKeyValues: map (nullable = true)\n", " | |-- key: integer\n", " | |-- value: integer (valueContainsNull = true)\n", "\n", "+----+---------------------------+\n", "|id |myKeyValues |\n", "+----+---------------------------+\n", "|1000|{1 -> 1, 2 -> 2} |\n", "|1001|{1 -> 10, 2 -> 11, 3 -> 12}|\n", "+----+---------------------------+\n", "\n" ] } ], "source": [ "schema = \"id INT, myKeyValues MAP\"\n", "\n", "t_list = [[1000, {1:1, 2:2}], [1001, {1:10, 2:11, 3:12}]]\n", "df = spark.createDataFrame(t_list, schema)\n", "df.createOrReplaceTempView(\"t1_map\")\n", "\n", "df.printSchema()\n", "df.show(10,False)" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+---------------------------+\n", "|id |mymap_transformed |\n", "+----+---------------------------+\n", "|1000|{1 -> 2, 2 -> 4} |\n", "|1001|{1 -> 11, 2 -> 13, 3 -> 15}|\n", "+----+---------------------------+\n", "\n" ] } ], "source": [ "spark.sql(\"SELECT id, transform_values(myKeyValues, (k, v) -> k + v) as mymap_transformed from t1_map\").show(10, False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Python User Defined Functions (UDF)" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+\n", "|slowf(1)|\n", "+--------+\n", "| 2|\n", "+--------+\n", "\n", "+--------+\n", "|slowf(1)|\n", "+--------+\n", "| 2|\n", "+--------+\n", "\n", "CPU times: user 2.72 ms, sys: 1.08 ms, total: 3.8 ms\n", "Wall time: 1.12 s\n", "+---+---------+\n", "| id|slowf(id)|\n", "+---+---------+\n", "| 0| 0|\n", "| 1| 2|\n", "| 2| 4|\n", "| 3| 6|\n", "| 4| 8|\n", "| 5| 10|\n", "| 6| 12|\n", "| 7| 14|\n", "| 8| 16|\n", "| 9| 18|\n", "+---+---------+\n", "\n", "CPU times: user 3.33 ms, sys: 1.56 ms, total: 4.89 ms\n", "Wall time: 10.2 s\n" ] } ], "source": [ "# Basic Python UDF example\n", "\n", "import time\n", "def slowf(s):\n", " time.sleep(1) # this waits for 1 second\n", " return 2*s\n", "\n", "spark.udf.register(\"slowf\", slowf)\n", "\n", "# warmup\n", "spark.sql(\"select slowf(1)\").show()\n", "\n", "%time spark.sql(\"select slowf(1)\").show()\n", "\n", "%time spark.sql(\"select id, slowf(id) from range(10)\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Pandas UDF - improves on Python UDF" ] }, { "cell_type": "code", "execution_count": 32, "metadata": { "scrolled": false }, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 32, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Python Pandas UDF example\n", "# Pandas UDF are a faster implementation for Python UDF\n", "\n", "import time\n", "import pandas as pd\n", "from pyspark.sql.functions import pandas_udf\n", "\n", "@pandas_udf('long')\n", "def multiply_func(a,b):\n", " time.sleep(1)\n", " return a * b\n", "\n", "spark.udf.register(\"multiply_func\", multiply_func)\n", "\n" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------------+\n", "|multiply_func(1, 1)|\n", "+-------------------+\n", "| 1|\n", "+-------------------+\n", "\n", "+-------------------+\n", "|multiply_func(1, 1)|\n", "+-------------------+\n", "| 1|\n", "+-------------------+\n", "\n", "CPU times: user 2.43 ms, sys: 558 µs, total: 2.99 ms\n", "Wall time: 1.1 s\n", "+---+--------------------+\n", "| id|multiply_func(id, 2)|\n", "+---+--------------------+\n", "| 0| 0|\n", "| 1| 2|\n", "| 2| 4|\n", "| 3| 6|\n", "| 4| 8|\n", "| 5| 10|\n", "| 6| 12|\n", "| 7| 14|\n", "| 8| 16|\n", "| 9| 18|\n", "+---+--------------------+\n", "\n", "CPU times: user 3.48 ms, sys: 190 µs, total: 3.67 ms\n", "Wall time: 1.12 s\n", "\n", "Now running on 10000 rows\n", "CPU times: user 39.6 ms, sys: 4.83 ms, total: 44.4 ms\n", "Wall time: 1.39 s\n" ] } ], "source": [ "# Can you guess how long will it take to run the udf with 1 row, 10 rows and 10000 rows?\n", "# Run the followin cell to see the answer\n", "\n", "# warmup\n", "spark.sql(\"select multiply_func(1,1)\").show()\n", "\n", "%time spark.sql(\"select multiply_func(1,1)\").show()\n", "\n", "%time spark.sql(\"select id, multiply_func(id,2) from range(10)\").show()\n", "\n", "num_rows=10000\n", "print(f\"\\nNow running on {num_rows} rows\")\n", "%time a=spark.sql(f\"select multiply_func(id,2) from range({num_rows})\").collect()\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# What will change if I run on 10001 rows?\n", "\n", "num_rows=10001\n", "print(f\"\\nNow running on {num_rows} rows\")\n", "%time a=spark.sql(f\"select multiply_func(id,2) from range({num_rows})\").collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Pandas UDF with type hints for Python 3\n", "# This is the recommended syntax introduced Spark 3\n", "\n", "import time\n", "import pandas as pd\n", "from pyspark.sql.functions import pandas_udf\n", "\n", "@pandas_udf('long')\n", "def multiply_func2(a: pd.Series, b: pd.Series) -> pd.Series:\n", " time.sleep(1)\n", " return a * b\n", "\n", "spark.udf.register(\"multiply_func2\", multiply_func)\n", "\n", "# warmup\n", "spark.sql(\"select multiply_func2(1,1)\").show()\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# mapInPandas\n", "# This allows to return a different number of rows than what was in the input\n", "# while still using arrow serialization for performance\n", "# Note: this uses the DataFrame API\n", "\n", "df = spark.createDataFrame([(1, 21), (2, 30)], (\"id\", \"age\"))\n", "\n", "df.show()\n", "\n", "def filter_func(iterator):\n", " for pdf in iterator:\n", " yield pdf[pdf.id == 1]\n", "\n", "df.mapInPandas(filter_func, schema=df.schema).show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# End the Spark application\n", "spark.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.13" } }, "nbformat": 4, "nbformat_minor": 1 }