Integration of SWAN with Spark clusters
The current setup allows to execute PySpark operations on CERN Hadoop and Spark clusters.
This notebook illustrates the use of Spark in SWAN to access CERN Accelerator logging service data.
Connect to the cluster (NXCals)¶
In the SWAN configuration menu:
- Choose the NXCals project software stack
- Choose the NXCals Hadoop cluster
To connect to a cluster, click on the star button on the top and follow the instructions
- The star button only appears if you have selected a SPARK cluster in the configuration
- The star button is active after the notebook kernel is ready
- SELECT NXCALS configuration bundle
- Access to the cluster and NXCALS data is controlled by acc-logging-team, please contact acc-logging-team@cern.ch
NXCals DataExtraction API - Examples¶
See NXCals API documentation at: http://nxcals-docs.web.cern.ch/current/
1) Extract data using device/property pairs¶
Inspect data¶
In [1]:
# source the nxcals query builders
from nxcals.api.extraction.data.builders import *
# define the extraction Spark DataFrame using the NXCALS API
# see also
df1 = ( DevicePropertyDataQuery.builder(spark)
.system('CMW')
.startTime('2022-11-01 00:00:00.000')
.endTime('2022-11-02 00:00:00.000')
.entity()
.device('RADMON.PS-10')
.property('ExpertMonitoringAcquisition')
.build()
)
In [2]:
df1.printSchema()
In [3]:
df1.select('acqStamp','voltage_18V','current_18V','device','pt100Value').show()
Register the dataFrame as temporary table (myData) to write direct SQL queries.¶
In [4]:
df1.createOrReplaceTempView("myData")
In [5]:
# a simple select with min / max
spark.sql("select min(acqStamp), max(acqStamp) from myData").show()
In [6]:
# perform some aggregations
spark.sql("select device, count(*) from myData group by device").show()
Draw a plot using matplotlib¶
In [7]:
# Select the data columns of interest
# fetch data into a Pandas dataframe for visualization
import pandas as pd
p_df = df1.select('acqStamp','current_18V').filter("current_18V is not null").toPandas()
In [8]:
import matplotlib
from matplotlib.ticker import FuncFormatter, MaxNLocator
%matplotlib inline
data = p_df.sort_values(by='acqStamp')
def format_fn(tick_val, tick_pos):
return pd.to_datetime(tick_val,unit='ns')
myplot=data.plot(x='acqStamp',y='current_18V', figsize=(15,5))
myplot.set_xlabel('Time')
myplot.set_ylabel('Current')
myplot.xaxis.set_major_formatter(FuncFormatter(format_fn))
myplot.xaxis.set_major_locator(MaxNLocator(integer=True))
myplot.tick_params(axis='x', which='major', labelsize='small', labelcolor='r', rotation=45)
2) Read TGM data¶
In [9]:
# Map data extraction to a Spark DataFrame using the NXCALS API
tgmData = ( DevicePropertyDataQuery.builder(spark)
.system('CMW')
.startTime('2022-11-01 00:00:00.000')
.endTime('2022-11-02 00:00:00.000')
.entity()
.device("CPS.TGM")
.property("FULL-TELEGRAM.STRC")
.build()
)
In [10]:
tgmData.toPandas()[:2]
Out[10]:
In [11]:
# Aggregate and count per user
tgmData.groupBy("USER").count().show()
DataQuery example¶
In [12]:
# Map the data into a dataframe, extract using DataQuery in the NXCALS api
df = ( DataQuery.builder(spark).entities()
.system("CMW")
.keyValuesEq({"device": "LHC_STATS", "property": "Calculations"})
.timeWindow("2022-10-10 00:00:00.0", "2022-11-10 00:00:00.0")
.build()
)
In [13]:
df.printSchema()
In [14]:
df.select("acqStamp", "device", "LUMI_ATLAS_PEAK").show()
BPT downsampling¶
In [15]:
from cern.nxcals.pyquery.builders import *
from pyspark.sql.functions import *
sdf = DataQuery(spark).byVariables() \
.system('CMW') \
.startTime('2022-01-01 00:00:00.00') \
.endTime('2022-12-31 00:00:00.00') \
.variables(['LHC.BCTFR.A6R4.B1:BEAM_INTENSITY', 'LHC.BCTFR.A6R4.B2:BEAM_INTENSITY']) \
.buildDataset()
In [16]:
# extract with downsampling
# average data on bins of 1h
sdf_ds = ( sdf.withColumn('time_bin', floor(col('nxcals_timestamp') / 3600e9)*3600)
.groupby(['nxcals_variable_name', 'time_bin']).mean('nxcals_value')
.withColumnRenamed('avg(nxcals_value)', 'nxcals_value') \
.orderBy('time_bin')
)
In [17]:
sdf_ds.count()
Out[17]:
In [21]:
sdf_ds.show(3, False)
In [22]:
# Show another method for creating time bins
sdf.withColumn('time_bin', window(timestamp_seconds(col('nxcals_timestamp')/1e9), '10 minutes')).show(3)
In [20]:
# spark.stop()
In [ ]: