{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## This notebook is part of the Apache Spark training delivered by CERN IT\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "skip" } }, "source": [ "Run this notebook from Jupyter with Python kernel\n", "- When running on CERN SWAN, do not attach the notebook to a Spark cluster, but rather run it locally on the SWAN container (which is the default)\n", "- If running this outside CERN SWAN, please make sure to have PySpark installed: `pip install pyspark`\n", "\n", "\n", "In order to run this notebook as slides:\n", " - on SWAN click on the button \"Enter/Exit RISE slideshow\" in the ribbon\n", " - on other environments please make sure to have the RISE extension installed `pip install RISE` " ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### SPARK DataFrame Hands-On Lab\n", "Contact: Luca.Canali@cern.ch\n", "\n", "### Objective: Perform Basic DataFrame Operations\n", "1. Creating DataFrames\n", "2. Select columns\n", "3. Add, rename and drop columns\n", "4. Filtering rows\n", "5. Aggregations" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Getting started: create the SparkSession" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true, "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "# !pip install pyspark\n", "\n", "from pyspark.sql import SparkSession\n", "\n", "spark = (SparkSession.builder\n", " .master(\"local[*]\") \\\n", " .appName(\"DataFrame HandsOn 1\") \\\n", " .config(\"spark.ui.showConsoleProgress\",\"false\") \\\n", " .getOrCreate()\n", " )\n", "\n", "spark" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The master `local[*]` means that the executors are in the same node that is running the driver. The `*` tells Spark to start as many executors as there are logical cores available" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### Hands-On 1 - Construct a DataFrame from csv file\n", "This demostrates how to read a csv file and construct a DataFrame. \n", "We will use the online retail dataset from Kaggle, credits: https://www.kaggle.com/datasets/vijayuv/onlineretail\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "fragment" } }, "source": [ "#### First, let's inspect the csv content" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country\r\n", "536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom\r\n", "536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850,United Kingdom\r\n" ] } ], "source": [ "!gzip -cd ../data/online-retail-dataset.csv.gz 2>&1| head -n3" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "online_retail_schema=\"InvoiceNo int, StockCode string, Description string, Quantity int,\\\n", "InvoiceDate timestamp,UnitPrice float,CustomerId int, Country string\"" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "df = (spark.read\n", " .option(\"header\", \"true\")\n", " .option(\"timestampFormat\", \"M/d/yyyy H:m\")\n", " .csv(\"../data/online-retail-dataset.csv.gz\",\n", " schema=online_retail_schema)\n", " )" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "#### Inspect the data" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+\n", "|InvoiceNo|StockCode|Description |Quantity|InvoiceDate |UnitPrice|CustomerId|Country |\n", "+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+\n", "|536365 |85123A |WHITE HANGING HEART T-LIGHT HOLDER|6 |2010-12-01 08:26:00|2.55 |17850 |United Kingdom|\n", "|536365 |71053 |WHITE METAL LANTERN |6 |2010-12-01 08:26:00|3.39 |17850 |United Kingdom|\n", "+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+\n", "only showing top 2 rows\n", "\n" ] } ], "source": [ "df.show(2, False)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "#### Show columns" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- InvoiceNo: integer (nullable = true)\n", " |-- StockCode: string (nullable = true)\n", " |-- Description: string (nullable = true)\n", " |-- Quantity: integer (nullable = true)\n", " |-- InvoiceDate: timestamp (nullable = true)\n", " |-- UnitPrice: float (nullable = true)\n", " |-- CustomerId: integer (nullable = true)\n", " |-- Country: string (nullable = true)\n", "\n" ] } ], "source": [ "df.printSchema()" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### Hands-On 2 - Spark Transformations - select, add, rename and drop columns" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Select dataframe columns" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------+\n", "| Country|\n", "+--------------+\n", "|United Kingdom|\n", "|United Kingdom|\n", "+--------------+\n", "only showing top 2 rows\n", "\n" ] } ], "source": [ "# select single column\n", "\n", "df.select(\"Country\").show(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Select multiple columns\n" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------+----------------------------------+---------+\n", "|StockCode|Description |UnitPrice|\n", "+---------+----------------------------------+---------+\n", "|85123A |WHITE HANGING HEART T-LIGHT HOLDER|2.55 |\n", "|71053 |WHITE METAL LANTERN |3.39 |\n", "+---------+----------------------------------+---------+\n", "only showing top 2 rows\n", "\n" ] } ], "source": [ "df.select(\"StockCode\",\"Description\",\"UnitPrice\").show(n=2, truncate=False)" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "data": { "text/plain": [ "['InvoiceNo',\n", " 'StockCode',\n", " 'Description',\n", " 'Quantity',\n", " 'InvoiceDate',\n", " 'UnitPrice',\n", " 'CustomerId',\n", " 'Country']" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.columns" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------+---------+--------------------+--------+-------------------+\n", "|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|\n", "+---------+---------+--------------------+--------+-------------------+\n", "| 536365| 85123A|WHITE HANGING HEA...| 6|2010-12-01 08:26:00|\n", "| 536365| 71053| WHITE METAL LANTERN| 6|2010-12-01 08:26:00|\n", "+---------+---------+--------------------+--------+-------------------+\n", "only showing top 2 rows\n", "\n" ] } ], "source": [ "# select first 5 columns\n", "df.select(df.columns[0:5]).show(2)" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------+\n", "|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerId| Country|HighValueItem|\n", "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------+\n", "| 536365| 85123A|WHITE HANGING HEA...| 6|2010-12-01 08:26:00| 2.55| 17850|United Kingdom| false|\n", "| 536365| 71053| WHITE METAL LANTERN| 6|2010-12-01 08:26:00| 3.39| 17850|United Kingdom| false|\n", "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-------------+\n", "only showing top 2 rows\n", "\n" ] } ], "source": [ "# selects all the original columns and adds a new column that specifies high value item\n", "(df.selectExpr(\n", " \"*\", # all original columns\n", " \"(UnitPrice > 100) as HighValueItem\")\n", " .show(2)\n", ")" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------+--------------+\n", "|TotalQuantity|InventoryValue|\n", "+-------------+--------------+\n", "| 5176450| 2498803|\n", "+-------------+--------------+\n", "\n" ] } ], "source": [ "# selects all the original columns and adds a new column that specifies high value item\n", "(df.selectExpr(\n", " \"sum(Quantity) as TotalQuantity\",\n", " \"cast(sum(UnitPrice) as int) as InventoryValue\")\n", " .show()\n", ")" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "#### Adding, renaming and dropping columns" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------+----------------------------------+---------+--------+------------+\n", "|InvoiceNo|Description |UnitPrice|Quantity|InvoiceValue|\n", "+---------+----------------------------------+---------+--------+------------+\n", "|536365 |WHITE HANGING HEART T-LIGHT HOLDER|2.55 |6 |15.299999 |\n", "|536365 |WHITE METAL LANTERN |3.39 |6 |20.34 |\n", "+---------+----------------------------------+---------+--------+------------+\n", "only showing top 2 rows\n", "\n", "+---------+----------------------------------+---------+--------+---------+\n", "|InvoiceNo|Description |UnitPrice|Quantity|LineTotal|\n", "+---------+----------------------------------+---------+--------+---------+\n", "|536365 |WHITE HANGING HEART T-LIGHT HOLDER|2.55 |6 |15.299999|\n", "|536365 |WHITE METAL LANTERN |3.39 |6 |20.34 |\n", "+---------+----------------------------------+---------+--------+---------+\n", "only showing top 2 rows\n", "\n", "+---------+----------------------------------+---------+--------+\n", "|InvoiceNo|Description |UnitPrice|Quantity|\n", "+---------+----------------------------------+---------+--------+\n", "|536365 |WHITE HANGING HEART T-LIGHT HOLDER|2.55 |6 |\n", "|536365 |WHITE METAL LANTERN |3.39 |6 |\n", "+---------+----------------------------------+---------+--------+\n", "only showing top 2 rows\n", "\n" ] } ], "source": [ "# add a new column called InvoiceValue\n", "from pyspark.sql.functions import expr\n", "df_1 = (df\n", " .withColumn(\"InvoiceValue\", expr(\"UnitPrice * Quantity\"))\n", " .select(\"InvoiceNo\",\"Description\",\"UnitPrice\",\"Quantity\",\"InvoiceValue\")\n", " )\n", "df_1.show(2, False)\n", "\n", "# rename InvoiceValue to LineTotal\n", "df_2 = df_1.withColumnRenamed(\"InvoiceValue\",\"LineTotal\")\n", "df_2.show(2, False)\n", "\n", "# drop a column\n", "df_2.drop(\"LineTotal\").show(2, False)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### Hands-On 3 - Spark Transformations - filter, sort and cast" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n", "|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerId| Country|\n", "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n", "| 556444| 22502|PICNIC BASKET WIC...| 60|2011-06-10 15:28:00| 649.5| 15098|United Kingdom|\n", "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n", "\n", "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n", "|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerId| Country|\n", "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n", "| 556444| 22502|PICNIC BASKET WIC...| 60|2011-06-10 15:28:00| 649.5| 15098|United Kingdom|\n", "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n", "\n", "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n", "|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerId| Country|\n", "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n", "| 556444| 22502|PICNIC BASKET WIC...| 60|2011-06-10 15:28:00| 649.5| 15098|United Kingdom|\n", "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n", "\n" ] } ], "source": [ "from pyspark.sql.functions import col\n", "\n", "# select invoice lines with quantity > 50 and unitprice > 20\n", "df.where(col(\"Quantity\") > 20).where(col(\"UnitPrice\") > 50).show(2)\n", "df.filter(df.Quantity > 20).filter(df.UnitPrice > 50).show(2)\n", "df.filter(\"Quantity > 20 and UnitPrice > 50\").show(2)" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n", "|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerId| Country|\n", "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n", "| 536378| 21212|PACK OF 72 RETROS...| 120|2010-12-01 09:37:00| 0.42| 14688|United Kingdom|\n", "| null| D| Discount| -1|2010-12-01 09:41:00| 27.5| 14527|United Kingdom|\n", "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n", "only showing top 2 rows\n", "\n" ] } ], "source": [ "# select invoice lines with quantity > 100 or unitprice > 20\n", "df.where((col(\"Quantity\") > 100) | (col(\"UnitPrice\") > 20)).show(2)" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------+---------+---------------+--------+-------------------+---------+----------+--------------+\n", "|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerId| Country|\n", "+---------+---------+---------------+--------+-------------------+---------+----------+--------------+\n", "| null| B|Adjust bad debt| 1|2011-08-12 14:52:00|-11062.06| null|United Kingdom|\n", "| null| B|Adjust bad debt| 1|2011-08-12 14:51:00|-11062.06| null|United Kingdom|\n", "+---------+---------+---------------+--------+-------------------+---------+----------+--------------+\n", "only showing top 2 rows\n", "\n", "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n", "|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerId| Country|\n", "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n", "| 581483| 23843|PAPER CRAFT , LIT...| 80995|2011-12-09 09:15:00| 2.08| 16446|United Kingdom|\n", "| 541431| 23166|MEDIUM CERAMIC TO...| 74215|2011-01-18 10:01:00| 1.04| 12346|United Kingdom|\n", "| 578841| 84826|ASSTD DESIGN 3D P...| 12540|2011-11-25 15:57:00| 0.0| 13256|United Kingdom|\n", "| 542504| 37413| null| 5568|2011-01-28 12:03:00| 0.0| null|United Kingdom|\n", "| 573008| 84077|WORLD WAR 2 GLIDE...| 4800|2011-10-27 12:26:00| 0.21| 12901|United Kingdom|\n", "| 554868| 22197|SMALL POPCORN HOLDER| 4300|2011-05-27 10:52:00| 0.72| 13135|United Kingdom|\n", "| 556231| 85123A| ?| 4000|2011-06-09 15:04:00| 0.0| null|United Kingdom|\n", "| 544612| 22053|EMPIRE DESIGN ROS...| 3906|2011-02-22 10:43:00| 0.82| 18087|United Kingdom|\n", "| 560599| 18007|ESSENTIAL BALM 3....| 3186|2011-07-19 17:04:00| 0.06| 14609|United Kingdom|\n", "| 550461| 21108|FAIRY CAKE FLANNE...| 3114|2011-04-18 13:20:00| 2.1| 15749|United Kingdom|\n", "+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+\n", "only showing top 10 rows\n", "\n" ] } ], "source": [ "from pyspark.sql.functions import desc, asc\n", "\n", "# sort in the default order: ascending\n", "df.orderBy(expr(\"UnitPrice\")).show(2)\n", "\n", "df.orderBy(col(\"Quantity\").desc(), col(\"UnitPrice\").asc()).show(10)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### Hands-On 4 - Spark Transformations - aggregations\n", "full list of built int functions - https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------------+\n", "|count(DISTINCT CustomerID)|\n", "+--------------------------+\n", "| 4372|\n", "+--------------------------+\n", "\n", "CPU times: user 1.48 ms, sys: 2.55 ms, total: 4.02 ms\n", "Wall time: 1.49 s\n" ] } ], "source": [ "%%time\n", "# Count distinct customers\n", "from pyspark.sql.functions import countDistinct\n", "df.select(countDistinct(\"CustomerID\")).show()" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------------------------------+\n", "|approx_count_distinct(CustomerID)|\n", "+---------------------------------+\n", "| 4336|\n", "+---------------------------------+\n", "\n", "CPU times: user 2.36 ms, sys: 1.57 ms, total: 3.93 ms\n", "Wall time: 1.24 s\n" ] } ], "source": [ "%%time\n", "# approx. distinct stock items\n", "from pyspark.sql.functions import approx_count_distinct\n", "df.select(approx_count_distinct(\"CustomerID\", 0.1)).show()" ] }, { "cell_type": "code", "execution_count": 21, "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------------+-------------+-------------+\n", "| avg_purchases|max_purchases|min_purchases|\n", "+----------------+-------------+-------------+\n", "|9.55224954743324| 80995| -80995|\n", "+----------------+-------------+-------------+\n", "\n" ] } ], "source": [ "# average, maximum and minimum purchase quantity\n", "from pyspark.sql.functions import avg, max, min\n", "( df.select(\n", " avg(\"Quantity\").alias(\"avg_purchases\"),\n", " max(\"Quantity\").alias(\"max_purchases\"),\n", " min(\"Quantity\").alias(\"min_purchases\"))\n", " .show()\n", ")" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### Hands-On 5 - Spark Transformations - grouping and windows" ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------+----------+-----+\n", "|InvoiceNo|CustomerId|count|\n", "+---------+----------+-----+\n", "| 536573| 17025| 4|\n", "| 537228| 17677| 1|\n", "| 537419| 13495| 14|\n", "| 538093| 12682| 33|\n", "| 538648| 17937| 5|\n", "+---------+----------+-----+\n", "only showing top 5 rows\n", "\n", "+---------+------------------+--------------------+\n", "|InvoiceNo| avg(Quantity)|stddev_pop(Quantity)|\n", "+---------+------------------+--------------------+\n", "| 536532| 25.36986301369863| 16.850272831671976|\n", "| 537632| 1.0| 0.0|\n", "| 538708| 10.61111111111111| 7.150282736359209|\n", "| 538877|14.258278145695364| 27.56989037543246|\n", "| 538993| 9.333333333333334| 2.748737083745107|\n", "+---------+------------------+--------------------+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "# count of items on the invoice\n", "df.groupBy(\"InvoiceNo\", \"CustomerId\").count().show(5)\n", "\n", "# grouping with expressions\n", "df.groupBy(\"InvoiceNo\").agg(expr(\"avg(Quantity)\"),expr(\"stddev_pop(Quantity)\"))\\\n", " .show(5)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### Read the csv file into DataFrame\n", "\n", "`%%time` is an iPython magic https://ipython.readthedocs.io/en/stable/interactive/magics.html\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It's possible to read files without specifying the schema. Some file formats (Parquet is one of them) include the schema, which means that Spark can start reading the file. For format without schema (csv, json...) Spark can infer the schema. Let's see what's the difference in terms of time and of results:" ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "slideshow": { "slide_type": "subslide" } }, "outputs": [], "source": [ "online_retail_schema=\"InvoiceNo int, StockCode string, Description string, Quantity int,\\\n", "InvoiceDate timestamp,UnitPrice float,CustomerId int, Country string\"" ] }, { "cell_type": "code", "execution_count": 24, "metadata": { "slideshow": { "slide_type": "-" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 735 µs, sys: 2.35 ms, total: 3.08 ms\n", "Wall time: 36.4 ms\n" ] } ], "source": [ "%%time\n", "df = spark.read \\\n", " .option(\"header\", \"true\") \\\n", " .option(\"timestampFormat\", \"M/d/yyyy H:m\")\\\n", " .csv(\"../data/online-retail-dataset.csv.gz\",\n", " schema=online_retail_schema)" ] }, { "cell_type": "code", "execution_count": 25, "metadata": { "scrolled": true, "slideshow": { "slide_type": "-" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 2.45 ms, sys: 741 µs, total: 3.19 ms\n", "Wall time: 2.03 s\n" ] } ], "source": [ "%%time\n", "df_infer = spark.read \\\n", " .option(\"header\", \"true\") \\\n", " .option(\"inferSchema\", \"true\") \\\n", " .csv(\"../data/online-retail-dataset.csv.gz\")" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Exercises" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Reminder: documentation at \n", "https://spark.apache.org/docs/latest/api/python/index.html" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "If you didn't run the previous cells, run the following one:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.builder \\\n", " .master(\"local[*]\") \\\n", " .appName(\"DataFrame HandsOn 1\") \\\n", " .config(\"spark.ui.showConsoleProgress\",\"false\") \\\n", " .getOrCreate()\n", "\n", "online_retail_schema=\"InvoiceNo int, StockCode string, Description string, Quantity int,\\\n", "InvoiceDate timestamp,UnitPrice float,CustomerId int, Country string\"\n", "\n", "df = spark.read \\\n", " .option(\"header\", \"true\") \\\n", " .option(\"timestampFormat\", \"M/d/yyyy H:m\")\\\n", " .csv(\"../data/online-retail-dataset.csv.gz\",\n", " schema=online_retail_schema)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Task: Show 5 lines of the \"description\" column " ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Task: Count the number of distinct invoices in the dataframe" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Task: Find out in which month most invoices have been issued" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Task: Filter the lines where the Quantity is more than 30" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Task: Show the four most sold items (by quantity)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "subslide" } }, "source": [ "Bonus question: why do these two operations return different results? Hint: look at the documentation " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(df.select(\"InvoiceNo\").distinct().count())\n", "from pyspark.sql.functions import countDistinct\n", "df.select(countDistinct(\"InvoiceNo\")).show()" ] } ], "metadata": { "celltoolbar": "Slideshow", "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.13" }, "sparkconnect": { "bundled_options": [], "list_of_options": [] } }, "nbformat": 4, "nbformat_minor": 1 }