{ "cells": [ { "cell_type": "markdown", "id": "c97f8c4d", "metadata": {}, "source": [ "# Apache Spark reading from Databases via JDBC\n", "\n", "This notebook shows how to access relational databases using Apache Spark. \n", "It is an example of extracting data from Oracle tables and storing the contents into Apache Parquet files, for reporting and data analysis purposes. \n", "\n", "For more details on how to use and tune the Spark JDBC data source, see also:\n", " - https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Notes/Spark_Oracle_JDBC_Howto.md\n", " - https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases\n", "\n", "Author and contact: Luca.Canali@cern.ch \n", "November, 2022" ] }, { "cell_type": "markdown", "id": "4152ca50", "metadata": {}, "source": [ "## Test setup - Oracle DB instance\n", "This notebook illustrates using the Spark JDBC datasource for connecting to Oracle databases, but it can easily be modified for other databases. \n", "For the puposes of this demo we will use a test database on a docker container (see instructions below). \n", "You can also use an existing database for testing instead, for example a database on CERN developmnet db devdb19 (open a ticket to Oracle services at CERN if you need one). \n", "Setup of a test Oracle DB using containers: \n", " - run oracle xe on a container from gvenzl dockerhub repo https://github.com/gvenzl/oci-oracle-xe\n", " - `docker run -d --name mydb1 -e ORACLE_PASSWORD=oracle -p 1521:1521 gvenzl/oracle-xe:latest`\n", " - wait till the DB is started, check logs at: `docker logs -f mydb1`\n" ] }, { "cell_type": "markdown", "id": "22d54a3d", "metadata": {}, "source": [ "## Read Oracle tables using the Spark JDBC data source" ] }, { "cell_type": "code", "execution_count": null, "id": "be0c7ca3", "metadata": { "scrolled": false }, "outputs": [], "source": [ "#\n", "# Local mode: run this when using CERN SWAN not connected to a cluster \n", "# or run it on a private Jupyter notebook instance\n", "# Dependency: PySpark (use SWAN or pip install pyspark)\n", "#\n", "# For CERN users: when using CERN SWAN connected to a cluster (analytix or cloud resources)\n", "# do not run this but rather click on the (star) button\n", "# add the configuration spark.jars.packages to point to the Oracle JDBC jar\n", "\n", "# Start the Spark Session\n", "from pyspark.sql import SparkSession\n", "spark = (SparkSession.builder\n", " .appName(\"Spark JDBC to Oracle\")\n", " .master(\"local[*]\")\n", " .config(\"spark.driver.memory\", \"2g\")\n", " .config(\"spark.jars.packages\", \"com.oracle.database.jdbc:ojdbc8:21.7.0.0\")\n", " .config(\"spark.ui.showConsoleProgress\", \"false\")\n", " .getOrCreate()\n", " )" ] }, { "cell_type": "code", "execution_count": 2, "id": "cabed284", "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "

SparkSession - in-memory

\n", " \n", "
\n", "

SparkContext

\n", "\n", "

Spark UI

\n", "\n", "
\n", "
Version
\n", "
v3.3.1
\n", "
Master
\n", "
local[*]
\n", "
AppName
\n", "
Spark JDBC to Oracle
\n", "
\n", "
\n", " \n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark" ] }, { "cell_type": "code", "execution_count": 3, "id": "2abaacd6", "metadata": {}, "outputs": [], "source": [ "# Edit with the target db username \n", "db_user = \"system\"\n" ] }, { "cell_type": "code", "execution_count": 4, "id": "b49dd991", "metadata": {}, "outputs": [], "source": [ "# Edit with the Oracle DB alias details\n", "# CERN users: see also the file /eos/project/o/oracle/public/admin/tnsnames.ora\n", "\n", "# dbserver:port/service_name\n", "db_connect_string = \"localhost:1521/XEPDB1\" \n" ] }, { "cell_type": "code", "execution_count": 5, "id": "0e34337d", "metadata": {}, "outputs": [], "source": [ "# Use getpass to avoid storing passwords inside notebooks\n", "# import getpass\n", "# db_pass = getpass.getpass()\n", "db_pass = \"oracle\"\n" ] }, { "cell_type": "code", "execution_count": 6, "id": "21010a50", "metadata": {}, "outputs": [], "source": [ "# Edit with the query to extract data from the target database\n", "# This is a dummy query just for demo purposes\n", "myquery = \"select rownum as id from dual connect by level<=10\"\n" ] }, { "cell_type": "code", "execution_count": 7, "id": "4a5dcd3a", "metadata": {}, "outputs": [], "source": [ "# This maps the Oracle query/table to a Spark DataFrame\n", "\n", "df = (spark.read.format(\"jdbc\").\n", " option(\"url\", f\"jdbc:oracle:thin:@{db_connect_string}\").\n", " option(\"driver\", \"oracle.jdbc.driver.OracleDriver\").\n", " option(\"query\", myquery).\n", " option(\"user\", db_user).\n", " option(\"password\", db_pass).\n", " option(\"fetchsize\", 10000).\n", " load()\n", " )" ] }, { "cell_type": "code", "execution_count": 8, "id": "3bf01f9b", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- ID: decimal(38,10) (nullable = true)\n", "\n", "+-------------+\n", "| ID|\n", "+-------------+\n", "| 1.0000000000|\n", "| 2.0000000000|\n", "| 3.0000000000|\n", "| 4.0000000000|\n", "| 5.0000000000|\n", "| 6.0000000000|\n", "| 7.0000000000|\n", "| 8.0000000000|\n", "| 9.0000000000|\n", "|10.0000000000|\n", "+-------------+\n", "\n" ] } ], "source": [ "# Show schema and data for testing purposes\n", "df.printSchema()\n", "df.show()" ] }, { "cell_type": "code", "execution_count": 9, "id": "5d6f24be", "metadata": {}, "outputs": [], "source": [ "# This is the main action:\n", "# - read from Oracle (see definition of the DataFrame df, above)\n", "# - write to Parquet files\n", "# - note: only one file will be written by this, \n", "# as the JDBC data source will use only 1 partition\n", "\n", "# customize\n", "path = \"/tmp/\"\n", "table_name = \"test\"\n", "\n", "df.write.mode(\"overwrite\").parquet(path + table_name + \".parquet\")" ] }, { "cell_type": "code", "execution_count": 10, "id": "564a9196", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "part-00000-f2a890a2-d5c3-4358-b1db-005af2990a5e-c000.snappy.parquet _SUCCESS\r\n" ] } ], "source": [ "# Show data on the filesystem\n", "\n", "# local filesystem\n", "!ls /tmp/test.parquet\n", "\n", "# HDFS\n", "# ! hdfs dfs -ls /tmp/test.parquet" ] }, { "cell_type": "code", "execution_count": null, "id": "055c2a6f", "metadata": {}, "outputs": [], "source": [ "# spark.stop()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" }, "sparkconnect": { "bundled_options": [ "ComputeIntensive" ], "list_of_options": [ { "name": "spark.jars.packages", "value": "com.oracle.database.jdbc:ojdbc8:21.7.0.0" } ] } }, "nbformat": 4, "nbformat_minor": 5 }