{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\"SWAN\"\n", "\"EP-SFT\"\n", "

\n", "

Integration of SWAN with Spark clusters

\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The current setup allows to execute PySpark operations on CERN Hadoop and Spark clusters. \n", "\n", "This notebook illustrates the use of __Spark in SWAN to analyze the monitoring data available on HDFS (analytix)__ and plots a heatmap of loadAvg across machines in a particular service." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Connect to the cluster (analytix)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To connect to a cluster, click on the star button on the top and follow the instructions\n", "* The star button only appears if you have selected a SPARK cluster in the configuration\n", "* The star button is active after the notebook kernel is ready" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Import necessary Spark and Python dependencies" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.functions import from_unixtime, when, col\n", "from pyspark.sql.types import *\n", "from pyspark.sql.functions import from_json" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "import seaborn as sns" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Select the data\n", "*This reads monitoring data stored in Hadoop*" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# Create the dataframe from the parquet files containing monitoring data\n", "df = spark.read.parquet(\"hdfs://analytix/project/monitoring/collectd/load/2022/10/14/\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Check the data structure" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- dstype: string (nullable = true)\n", " |-- host: string (nullable = true)\n", " |-- interval: double (nullable = true)\n", " |-- plugin: string (nullable = true)\n", " |-- plugin_instance: string (nullable = true)\n", " |-- time: long (nullable = true)\n", " |-- type: string (nullable = true)\n", " |-- type_instance: string (nullable = true)\n", " |-- env: string (nullable = true)\n", " |-- region: string (nullable = true)\n", " |-- dc: string (nullable = true)\n", " |-- value: double (nullable = true)\n", " |-- value_instance: string (nullable = true)\n", " |-- _id: string (nullable = true)\n", " |-- availability_zone: string (nullable = true)\n", " |-- landb_service_name: string (nullable = true)\n", " |-- event_timestamp: long (nullable = true)\n", " |-- submitter_environment: string (nullable = true)\n", " |-- submitter_hostgroup: string (nullable = true)\n", " |-- timestamp: long (nullable = true)\n", " |-- toplevel_hostgroup: string (nullable = true)\n", " |-- version: string (nullable = true)\n", "\n" ] } ], "source": [ "df.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create a temporary table view" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "df.createOrReplaceTempView(\"loadAvg\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Do the heavylifting in spark and collect aggregated view to panda DF" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# Extract the data running a query using Spark. \n", "# Fetch the results into a Pandas DataFrame for later plotting\n", "df_loadAvg_pandas = spark.sql(\"\"\"SELECT host,\n", " avg(value) as avg,\n", " hour(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')) as hr\n", " FROM loadAvg\n", " WHERE submitter_hostgroup like 'swan/node/production'\n", " AND dayofmonth(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')) = 14\n", " GROUP BY hour(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')), host\"\"\").toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Visualize load heatmap" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Text(0.5, 1.0, 'Heatmap of loadAvg for cluster on 2022/10/14')" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "# heatmap of loadAvg\n", "plt.figure(figsize=(12, 8))\n", "ax = sns.heatmap(df_loadAvg_pandas.pivot(index='host', columns='hr', values='avg'), cmap=\"Blues\")\n", "ax.set_title(\"Heatmap of loadAvg for cluster on 2022/10/14\", fontsize=20)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create a histogram of uptime for the monitored entities" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "scrolled": true }, "outputs": [], "source": [ "# create the dataframe\n", "df = spark.read.parquet(\"hdfs://analytix/project/monitoring/collectd/uptime/2022/10/14/\")" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "# create temporary view\n", "df.createOrReplaceTempView(\"uptime\")" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "# Extract the data running a query using Spark. \n", "# Fetch the results into a Pandas DataFrame \n", "df_uptime_pandas = spark.sql(\"\"\"SELECT host, round(max(value)/60/60/24) as days\n", " FROM uptime\n", " WHERE dayofmonth(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')) = 14\n", " AND hour(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')) = 12\n", " GROUP BY host\"\"\").toPandas()" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "# visualize with seaborn\n", "# histogram of uptime (time since last reboot)\n", "\n", "plt.figure(figsize=(12, 8))\n", "ax = sns.histplot(df_uptime_pandas['days'], kde=False, color='red', bins=range(0, 1800, 20))\n", "ax.set_title(\"Histogram of uptime\", fontsize=20)\n", "ax.set_yscale('log')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" }, "sparkconnect": { "bundled_options": [], "list_of_options": [] } }, "nbformat": 4, "nbformat_minor": 2 }