Spark_JDBC_Oracle.ipynb Open in SWAN Download

Apache Spark reading from Databases via JDBC

This notebook shows how to access relational databases using Apache Spark.
It is an example of extracting data from Oracle tables and storing the contents into Apache Parquet files, for reporting and data analysis purposes.

For more details on how to use and tune the Spark JDBC data source, see also:

Author and contact: Luca.Canali@cern.ch
November, 2022

Test setup - Oracle DB instance

This notebook illustrates using the Spark JDBC datasource for connecting to Oracle databases, but it can easily be modified for other databases.
For the puposes of this demo we will use a test database on a docker container (see instructions below).
You can also use an existing database for testing instead, for example a database on CERN developmnet db devdb19 (open a ticket to Oracle services at CERN if you need one).
Setup of a test Oracle DB using containers:

  • run oracle xe on a container from gvenzl dockerhub repo https://github.com/gvenzl/oci-oracle-xe
  • docker run -d --name mydb1 -e ORACLE_PASSWORD=oracle -p 1521:1521 gvenzl/oracle-xe:latest
  • wait till the DB is started, check logs at: docker logs -f mydb1

Read Oracle tables using the Spark JDBC data source

In [ ]:
#
# Local mode: run this when using CERN SWAN not connected to a cluster 
#             or run it on a private Jupyter notebook instance
#             Dependency: PySpark (use SWAN or pip install pyspark)
#
# For CERN users: when using CERN SWAN connected to a cluster (analytix or cloud resources)
#                 do not run this but rather click on the (star) button
#                 add the configuration spark.jars.packages to point to the Oracle JDBC jar

# Start the Spark Session
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("Spark JDBC to Oracle")
         .master("local[*]")
         .config("spark.driver.memory", "2g")
         .config("spark.jars.packages", "com.oracle.database.jdbc:ojdbc8:21.7.0.0")
         .config("spark.ui.showConsoleProgress", "false")
         .getOrCreate()
        )
In [2]:
spark
Out[2]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.3.1
Master
local[*]
AppName
Spark JDBC to Oracle
In [3]:
# Edit with the target db username 
db_user = "system"
In [4]:
# Edit with the Oracle DB alias details
# CERN users: see also the file /eos/project/o/oracle/public/admin/tnsnames.ora

# dbserver:port/service_name
db_connect_string = "localhost:1521/XEPDB1"
In [5]:
# Use getpass to avoid storing passwords inside notebooks
# import getpass
# db_pass = getpass.getpass()
db_pass = "oracle"
In [6]:
# Edit with the query to extract data from the target database
# This is a dummy query just for demo purposes
myquery = "select rownum as id from dual connect by level<=10"
In [7]:
# This maps the Oracle query/table to a Spark DataFrame

df = (spark.read.format("jdbc").
           option("url", f"jdbc:oracle:thin:@{db_connect_string}").
           option("driver", "oracle.jdbc.driver.OracleDriver").
           option("query", myquery).
           option("user", db_user).
           option("password", db_pass).
           option("fetchsize", 10000).
           load()
     )
In [8]:
# Show schema and data for testing purposes
df.printSchema()
df.show()
root
 |-- ID: decimal(38,10) (nullable = true)

+-------------+
|           ID|
+-------------+
| 1.0000000000|
| 2.0000000000|
| 3.0000000000|
| 4.0000000000|
| 5.0000000000|
| 6.0000000000|
| 7.0000000000|
| 8.0000000000|
| 9.0000000000|
|10.0000000000|
+-------------+

In [9]:
# This is the main action:
#   - read from Oracle (see definition of the DataFrame df, above)
#   - write to Parquet files
#   - note: only one file will be written by this, 
#           as the JDBC data source will use only 1 partition

# customize
path = "/tmp/"
table_name = "test"

df.write.mode("overwrite").parquet(path + table_name + ".parquet")
In [10]:
# Show data on the filesystem

# local filesystem
!ls /tmp/test.parquet

# HDFS
# ! hdfs dfs -ls /tmp/test.parquet
part-00000-f2a890a2-d5c3-4358-b1db-005af2990a5e-c000.snappy.parquet  _SUCCESS
In [ ]:
# spark.stop()