Demo_Spark_on_Hadoop.ipynb Open in SWAN Download

SWAN EP-SFT

Integration of SWAN with Spark clusters


The current setup allows to execute PySpark operations on CERN Hadoop and Spark clusters.

This notebook illustrates the use of Spark in SWAN to analyze the monitoring data available on HDFS (analytix) and plots a heatmap of loadAvg across machines in a particular service.

Connect to the cluster (analytix)

To connect to a cluster, click on the star button on the top and follow the instructions

  • The star button only appears if you have selected a SPARK cluster in the configuration
  • The star button is active after the notebook kernel is ready

Import necessary Spark and Python dependencies

In [1]:
from pyspark.sql.functions import from_unixtime, when, col
from pyspark.sql.types import *
from pyspark.sql.functions import from_json
In [2]:
%matplotlib inline
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

Select the data

This reads monitoring data stored in Hadoop

In [3]:
# Create the dataframe from the parquet files containing monitoring data
df = spark.read.parquet("hdfs://analytix/project/monitoring/collectd/load/2022/10/14/")

Check the data structure

In [4]:
df.printSchema()
root
 |-- dstype: string (nullable = true)
 |-- host: string (nullable = true)
 |-- interval: double (nullable = true)
 |-- plugin: string (nullable = true)
 |-- plugin_instance: string (nullable = true)
 |-- time: long (nullable = true)
 |-- type: string (nullable = true)
 |-- type_instance: string (nullable = true)
 |-- env: string (nullable = true)
 |-- region: string (nullable = true)
 |-- dc: string (nullable = true)
 |-- value: double (nullable = true)
 |-- value_instance: string (nullable = true)
 |-- _id: string (nullable = true)
 |-- availability_zone: string (nullable = true)
 |-- landb_service_name: string (nullable = true)
 |-- event_timestamp: long (nullable = true)
 |-- submitter_environment: string (nullable = true)
 |-- submitter_hostgroup: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- toplevel_hostgroup: string (nullable = true)
 |-- version: string (nullable = true)

Create a temporary table view

In [5]:
df.createOrReplaceTempView("loadAvg")

Do the heavylifting in spark and collect aggregated view to panda DF

In [6]:
# Extract the data running a query using Spark. 
# Fetch the results into a Pandas DataFrame for later plotting
df_loadAvg_pandas = spark.sql("""SELECT host,
                                      avg(value) as avg,
                                      hour(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')) as hr
                               FROM loadAvg
                               WHERE submitter_hostgroup like 'swan/node/production'
                               AND dayofmonth(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')) = 14
                               GROUP BY hour(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')), host""").toPandas()

Visualize load heatmap

In [7]:
# heatmap of loadAvg
plt.figure(figsize=(12, 8))
ax = sns.heatmap(df_loadAvg_pandas.pivot(index='host', columns='hr', values='avg'), cmap="Blues")
ax.set_title("Heatmap of loadAvg for cluster on 2022/10/14", fontsize=20)
Out[7]:
Text(0.5, 1.0, 'Heatmap of loadAvg for cluster on 2022/10/14')
No description has been provided for this image

Create a histogram of uptime for the monitored entities

In [8]:
# create the dataframe
df = spark.read.parquet("hdfs://analytix/project/monitoring/collectd/uptime/2022/10/14/")
In [9]:
# create temporary view
df.createOrReplaceTempView("uptime")
In [10]:
# Extract the data running a query using Spark. 
# Fetch the results into a Pandas DataFrame 
df_uptime_pandas = spark.sql("""SELECT  host, round(max(value)/60/60/24) as days
                              FROM uptime
                               WHERE dayofmonth(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')) = 14
                               AND hour(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')) = 12
                               GROUP BY host""").toPandas()
In [11]:
# visualize with seaborn
# histogram of uptime (time since last reboot)

plt.figure(figsize=(12, 8))
ax = sns.histplot(df_uptime_pandas['days'], kde=False, color='red', bins=range(0, 1800, 20))
ax.set_title("Histogram of uptime", fontsize=20)
ax.set_yscale('log')
No description has been provided for this image
In [ ]:
spark.stop()
In [ ]: