NXCals-example.ipynb Open in SWAN Download

SWAN EP-SFT

Integration of SWAN with Spark clusters


The current setup allows to execute PySpark operations on CERN Hadoop and Spark clusters.

This notebook illustrates the use of Spark in SWAN to access CERN Accelerator logging service data.

Connect to the cluster (NXCals)

In the SWAN configuration menu:

  • Choose the NXCals project software stack
  • Choose the NXCals Hadoop cluster

To connect to a cluster, click on the star button on the top and follow the instructions

  • The star button only appears if you have selected a SPARK cluster in the configuration
  • The star button is active after the notebook kernel is ready
  • SELECT NXCALS configuration bundle
  • Access to the cluster and NXCALS data is controlled by acc-logging-team, please contact acc-logging-team@cern.ch

NXCals DataExtraction API - Examples

See NXCals API documentation at: http://nxcals-docs.web.cern.ch/current/

1) Extract data using device/property pairs

Inspect data

In [1]:
# source the nxcals query builders
from nxcals.api.extraction.data.builders import *

# define the extraction Spark DataFrame using the NXCALS API
# see also 
df1 = ( DevicePropertyDataQuery.builder(spark)
         .system('CMW')
         .startTime('2022-11-01 00:00:00.000')
         .endTime('2022-11-02 00:00:00.000')
         .entity()
         .device('RADMON.PS-10')
         .property('ExpertMonitoringAcquisition')
         .build()
      )
23/03/12 21:18:01 WARN URLConfigurationSource: No URLs will be polled as dynamic configuration sources.
In [2]:
df1.printSchema()
root
 |-- PinDiodeVoltageOutOfRange: boolean (nullable = true)
 |-- Radfet1VoltageOutOfRange: boolean (nullable = true)
 |-- Radfet2VoltageOutOfRange: boolean (nullable = true)
 |-- __record_timestamp__: long (nullable = true)
 |-- __record_version__: long (nullable = true)
 |-- acqStamp: long (nullable = true)
 |-- class: string (nullable = true)
 |-- current_18V: double (nullable = true)
 |-- current_8V5: double (nullable = true)
 |-- current_memBanks: double (nullable = true)
 |-- current_radfet: double (nullable = true)
 |-- cyclestamp: long (nullable = true)
 |-- device: string (nullable = true)
 |-- property: string (nullable = true)
 |-- pt100Value: double (nullable = true)
 |-- selector: string (nullable = true)
 |-- temperatureDeported: double (nullable = true)
 |-- temperatureSensorBoard: double (nullable = true)
 |-- voltage_18V: double (nullable = true)
 |-- voltage_3V3: double (nullable = true)
 |-- voltage_3VNeg: double (nullable = true)
 |-- voltage_5V: double (nullable = true)
 |-- voltage_memoryBank2: double (nullable = true)
 |-- voltage_sensorAdcRef: double (nullable = true)
 |-- warningBits: integer (nullable = true)
 |-- nxcals_entity_id: long (nullable = true)

In [3]:
df1.select('acqStamp','voltage_18V','current_18V','device','pt100Value').show()
+-------------------+------------------+------------------+------------+------------------+
|           acqStamp|       voltage_18V|       current_18V|      device|        pt100Value|
+-------------------+------------------+------------------+------------+------------------+
|1667260801078283000|              null|              null|RADMON.PS-10|       109.4169887|
|1667260802078290000|              null| 44.00730505585671|RADMON.PS-10|109.29791806000001|
|1667260803078315000| 20.02644650638104|44.098701328039176|RADMON.PS-10|109.08954444000001|
|1667260805078360000| 20.02644650638104|     44.1291667521|RADMON.PS-10|108.70256486000001|
|1667260807078387000|20.024923235177997|44.464286416769035|RADMON.PS-10|      109.49140785|
|1667260808078408000|              null| 44.02253776788712|RADMON.PS-10|      108.77698401|
|1667260809078413000|              null| 44.40335556864739|RADMON.PS-10|      108.76210018|
|1667260811078467000|20.024923235177997| 44.38812285661698|RADMON.PS-10|              null|
|1667260812078480000|              null|44.053003191947944|RADMON.PS-10|              null|
|1667260813078499000|              null|              null|RADMON.PS-10|      109.05977678|
|1667260814078513000| 20.02796977758408| 44.00730505585671|RADMON.PS-10|108.68768103000001|
|1667260815078547000|20.024923235177997| 44.38812285661698|RADMON.PS-10|      109.22349891|
|1667260816078531000|              null| 44.00730505585671|RADMON.PS-10|      108.95558997|
|1667260817078565000|              null| 44.40335556864739|RADMON.PS-10|109.53605934000001|
|1667260820078616000|              null| 44.11393404006959|RADMON.PS-10|108.97047380000001|
|1667260821078618000| 20.02644650638104|              null|RADMON.PS-10|108.73233252000001|
|1667260822078631000|              null| 43.97683963179589|RADMON.PS-10|109.53605934000001|
|1667260823078639000|              null|44.053003191947944|RADMON.PS-10|109.46164019000001|
|1667260824078638000|              null| 44.31195929646493|RADMON.PS-10|              null|
|1667260829078727000| 20.02644650638104|              null|RADMON.PS-10|108.92582231000002|
+-------------------+------------------+------------------+------------+------------------+
only showing top 20 rows

Register the dataFrame as temporary table (myData) to write direct SQL queries.

In [4]:
df1.createOrReplaceTempView("myData")
23/03/12 21:18:14 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
In [5]:
# a simple select with min / max
spark.sql("select  min(acqStamp), max(acqStamp) from myData").show()
+-------------------+-------------------+
|      min(acqStamp)|      max(acqStamp)|
+-------------------+-------------------+
|1667260800078284000|1667347199568841000|
+-------------------+-------------------+

In [6]:
# perform some aggregations
spark.sql("select device, count(*) from myData group by device").show()
+------------+--------+
|      device|count(1)|
+------------+--------+
|RADMON.PS-10|   86399|
+------------+--------+

Draw a plot using matplotlib

In [7]:
# Select the data columns of interest
# fetch data into a Pandas dataframe for visualization
import pandas as pd

p_df = df1.select('acqStamp','current_18V').filter("current_18V is not null").toPandas()
23/03/12 21:18:29 WARN CheckAllocator: More than one DefaultAllocationManager on classpath. Choosing first found
In [8]:
import matplotlib
from matplotlib.ticker import FuncFormatter, MaxNLocator
%matplotlib inline

data = p_df.sort_values(by='acqStamp')

def format_fn(tick_val, tick_pos):
        return pd.to_datetime(tick_val,unit='ns')

myplot=data.plot(x='acqStamp',y='current_18V', figsize=(15,5))
myplot.set_xlabel('Time')
myplot.set_ylabel('Current')

myplot.xaxis.set_major_formatter(FuncFormatter(format_fn))
myplot.xaxis.set_major_locator(MaxNLocator(integer=True))
myplot.tick_params(axis='x', which='major', labelsize='small', labelcolor='r', rotation=45)
No description has been provided for this image

2) Read TGM data

In [9]:
# Map data extraction to a Spark DataFrame using the NXCALS API
tgmData = ( DevicePropertyDataQuery.builder(spark)
             .system('CMW')
             .startTime('2022-11-01 00:00:00.000')
             .endTime('2022-11-02 00:00:00.000')
             .entity()
             .device("CPS.TGM")
             .property("FULL-TELEGRAM.STRC")
            .build()
          )
In [10]:
tgmData.toPandas()[:2]
Out[10]:
BATCH BEAMID BPNM COMLN CYCLE CYTAG DEST DEST2 DURN FREE14 ... USER __record_timestamp__ __record_version__ acqStamp class cyclestamp device property selector nxcals_entity_id
0 0 57090 64 None 48 NaN PS_DUMP NONE 1 NaN ... ZERO 1667260813900000000 0 1667260813850000000 CPS.TGM 1667260813900000000 CPS.TGM FULL-TELEGRAM.STRC CPS.USER.ALL 46955
1 0 16555 25 None 22 NaN NTOF NONE 1 NaN ... TOF 1667260848700000000 0 1667260848650000000 CPS.TGM 1667260848700000000 CPS.TGM FULL-TELEGRAM.STRC CPS.USER.ALL 46955

2 rows × 34 columns

In [11]:
# Aggregate and count per user
tgmData.groupBy("USER").count().show()
+--------+-----+
|    USER|count|
+--------+-----+
|     MD4|  263|
| LHCIND2|  212|
|   EAST1| 4205|
|      AD|  746|
|    LHC5|   70|
|     MD6|  276|
|LHCPILOT|   64|
|    ION3| 1095|
|    ZERO|15940|
| SFTPRO3| 3473|
|    ION1| 2168|
|   EAST3| 6691|
|     MD2|  297|
|    LHC3| 1205|
|    MD10|   93|
| SFTPRO1| 1364|
|     TOF|13072|
|     MD5|  117|
| SFTPRO2| 1921|
+--------+-----+

DataQuery example

In [12]:
# Map the data into a dataframe, extract using DataQuery in the NXCALS api
df = ( DataQuery.builder(spark).entities()
       .system("CMW")
       .keyValuesEq({"device": "LHC_STATS", "property": "Calculations"})
       .timeWindow("2022-10-10 00:00:00.0", "2022-11-10 00:00:00.0")
       .build()
     )
In [13]:
df.printSchema()
root
 |-- ALICE_BKGD1: double (nullable = true)
 |-- ALICE_BKGD2: double (nullable = true)
 |-- ALICE_BKGD3: double (nullable = true)
 |-- ALICE_BPTX_DELTA_T: double (nullable = true)
 |-- ALICE_HALF_CROSSING_ANGLE: double (nullable = true)
 |-- ALICE_LUMI_REGION_CENTROID_X: double (nullable = true)
 |-- ALICE_LUMI_REGION_CENTROID_Y: double (nullable = true)
 |-- ALICE_LUMI_REGION_CENTROID_Z: double (nullable = true)
 |-- ALICE_LUMI_REGION_SIZE_X: double (nullable = true)
 |-- ALICE_LUMI_REGION_SIZE_Y: double (nullable = true)
 |-- ALICE_LUMI_REGION_SIZE_Z: double (nullable = true)
 |-- ALICE_SPECTROMETER: double (nullable = true)
 |-- ALICE_TILT_H: double (nullable = true)
 |-- ALICE_TILT_V: double (nullable = true)
 |-- ATLAS_BKGD1: double (nullable = true)
 |-- ATLAS_BKGD2: double (nullable = true)
 |-- ATLAS_BKGD3: double (nullable = true)
 |-- ATLAS_BPTX_DELTA_T: double (nullable = true)
 |-- ATLAS_HALF_CROSSING_ANGLE: double (nullable = true)
 |-- ATLAS_LUMI_REGION_SIZE_X: double (nullable = true)
 |-- ATLAS_LUMI_REGION_SIZE_Y: double (nullable = true)
 |-- ATLAS_LUMI_REGION_SIZE_Z: double (nullable = true)
 |-- ATLAS_TILT_H: double (nullable = true)
 |-- ATLAS_TILT_V: double (nullable = true)
 |-- B1_AVG_BUNCH_INTENSITY: double (nullable = true)
 |-- B1_BUNCH_LENGTH_MEAN_EOF: double (nullable = true)
 |-- B1_BUNCH_LENGTH_MEAN_SSB: double (nullable = true)
 |-- B1_INTENSITY_ATLAS_CMS_SSB: double (nullable = true)
 |-- B1_INTENSITY_DCBCT_SSB: double (nullable = true)
 |-- B1_INTENSITY_SSB: double (nullable = true)
 |-- B1_NUMBER_BUNCHES: integer (nullable = true)
 |-- B1_PARTICLE_TYPE: string (nullable = true)
 |-- B2_AVG_BUNCH_INTENSITY: double (nullable = true)
 |-- B2_BUNCH_LENGTH_MEAN_EOF: double (nullable = true)
 |-- B2_BUNCH_LENGTH_MEAN_SSB: double (nullable = true)
 |-- B2_INTENSITY_ATLAS_CMS_SSB: double (nullable = true)
 |-- B2_INTENSITY_DCBCT_SSB: double (nullable = true)
 |-- B2_INTENSITY_SSB: double (nullable = true)
 |-- B2_NUMBER_BUNCHES: integer (nullable = true)
 |-- B2_PARTICLE_TYPE: string (nullable = true)
 |-- BETA_STAR_ALICE: double (nullable = true)
 |-- BETA_STAR_ATLAS: double (nullable = true)
 |-- BETA_STAR_CMS: double (nullable = true)
 |-- BETA_STAR_LHCB: double (nullable = true)
 |-- BR1_H_ROT_EMITTANCE_AVG: double (nullable = true)
 |-- BR1_H_ROT_INTENSITY_AVG: double (nullable = true)
 |-- BR1_H_ROT_TAIL_AVG: double (nullable = true)
 |-- BR1_V_ROT_EMITTANCE_AVG: double (nullable = true)
 |-- BR1_V_ROT_INTENSITY_AVG: double (nullable = true)
 |-- BR1_V_ROT_TAIL_AVG: double (nullable = true)
 |-- BR2_H_ROT_EMITTANCE_AVG: double (nullable = true)
 |-- BR2_H_ROT_INTENSITY_AVG: double (nullable = true)
 |-- BR2_H_ROT_TAIL_AVG: double (nullable = true)
 |-- BR2_V_ROT_EMITTANCE_AVG: double (nullable = true)
 |-- BR2_V_ROT_INTENSITY_AVG: double (nullable = true)
 |-- BR2_V_ROT_TAIL_AVG: double (nullable = true)
 |-- BR3_H_ROT_EMITTANCE_AVG: double (nullable = true)
 |-- BR3_H_ROT_INTENSITY_AVG: double (nullable = true)
 |-- BR3_H_ROT_TAIL_AVG: double (nullable = true)
 |-- BR3_V_ROT_EMITTANCE_AVG: double (nullable = true)
 |-- BR3_V_ROT_INTENSITY_AVG: double (nullable = true)
 |-- BR3_V_ROT_TAIL_AVG: double (nullable = true)
 |-- BR4_H_ROT_EMITTANCE_AVG: double (nullable = true)
 |-- BR4_H_ROT_INTENSITY_AVG: double (nullable = true)
 |-- BR4_H_ROT_TAIL_AVG: double (nullable = true)
 |-- BR4_V_ROT_EMITTANCE_AVG: double (nullable = true)
 |-- BR4_V_ROT_INTENSITY_AVG: double (nullable = true)
 |-- BR4_V_ROT_TAIL_AVG: double (nullable = true)
 |-- BUNCH_SPACING_B1: integer (nullable = true)
 |-- CMS_BKGD1: double (nullable = true)
 |-- CMS_BKGD2: double (nullable = true)
 |-- CMS_BKGD3: double (nullable = true)
 |-- CMS_BPTX_DELTA_T: double (nullable = true)
 |-- CMS_HALF_CROSSING_ANGLE: double (nullable = true)
 |-- CMS_LUMI_REGION_SIZE_X: double (nullable = true)
 |-- CMS_LUMI_REGION_SIZE_Y: double (nullable = true)
 |-- CMS_LUMI_REGION_SIZE_Z: double (nullable = true)
 |-- CMS_TILT_H: double (nullable = true)
 |-- CMS_TILT_V: double (nullable = true)
 |-- CPS:BRIGHTNESS_AVG_FT: double (nullable = true)
 |-- CPS:BRIGHTNESS_AVG_INJ: double (nullable = true)
 |-- EMITTANCE_BSRT_B1_H_EOF: double (nullable = true)
 |-- EMITTANCE_BSRT_B1_H_SSB: double (nullable = true)
 |-- EMITTANCE_BSRT_B1_V_EOF: double (nullable = true)
 |-- EMITTANCE_BSRT_B1_V_SSB: double (nullable = true)
 |-- EMITTANCE_BSRT_B2_H_EOF: double (nullable = true)
 |-- EMITTANCE_BSRT_B2_H_SSB: double (nullable = true)
 |-- EMITTANCE_BSRT_B2_V_EOF: double (nullable = true)
 |-- EMITTANCE_BSRT_B2_V_SSB: double (nullable = true)
 |-- EMITTANCE_FROM_LUMI_SSB: double (nullable = true)
 |-- ENERGY: double (nullable = true)
 |-- FILL_NUMBER: integer (nullable = true)
 |-- HEATLOAD_SSB_ARC12: double (nullable = true)
 |-- HEATLOAD_SSB_ARC23: double (nullable = true)
 |-- HEATLOAD_SSB_ARC34: double (nullable = true)
 |-- HEATLOAD_SSB_ARC45: double (nullable = true)
 |-- HEATLOAD_SSB_ARC56: double (nullable = true)
 |-- HEATLOAD_SSB_ARC67: double (nullable = true)
 |-- HEATLOAD_SSB_ARC78: double (nullable = true)
 |-- HEATLOAD_SSB_ARC81: double (nullable = true)
 |-- IS_HB: boolean (nullable = true)
 |-- IS_IONS: boolean (nullable = true)
 |-- IS_MD: boolean (nullable = true)
 |-- IS_PROTONS: boolean (nullable = true)
 |-- IS_PROTON_ION: boolean (nullable = true)
 |-- IS_STABLE_BEAMS: boolean (nullable = true)
 |-- IS_TS: boolean (nullable = true)
 |-- LHC:INJECTION_SCHEME: string (nullable = true)
 |-- LHCB_BKGD1: double (nullable = true)
 |-- LHCB_BKGD2: double (nullable = true)
 |-- LHCB_BKGD3: double (nullable = true)
 |-- LHCB_BPTX_DELTA_T: double (nullable = true)
 |-- LHCB_HALF_CROSSING_ANGLE: double (nullable = true)
 |-- LHCB_LUMI_REGION_SIZE_X: double (nullable = true)
 |-- LHCB_LUMI_REGION_SIZE_Y: double (nullable = true)
 |-- LHCB_LUMI_REGION_SIZE_Z: double (nullable = true)
 |-- LHCB_POLARITY: double (nullable = true)
 |-- LHCB_SPECTROMETER: double (nullable = true)
 |-- LHCB_TILT_H: double (nullable = true)
 |-- LHCB_TILT_V: double (nullable = true)
 |-- LHC_BEAM_TYPE: string (nullable = true)
 |-- LUMI_ALICE_DELIVERED: double (nullable = true)
 |-- LUMI_ALICE_PEAK: double (nullable = true)
 |-- LUMI_ATLAS_DELIVERED: double (nullable = true)
 |-- LUMI_ATLAS_PEAK: double (nullable = true)
 |-- LUMI_CMS_DELIVERED: double (nullable = true)
 |-- LUMI_CMS_PEAK: double (nullable = true)
 |-- LUMI_LHCB_DELIVERED: double (nullable = true)
 |-- LUMI_LHCB_PEAK: double (nullable = true)
 |-- LUMI_LIFETIME_ATLAS_EOF: double (nullable = true)
 |-- LUMI_LIFETIME_ATLAS_REL_ERROR_EOF: double (nullable = true)
 |-- LUMI_LIFETIME_ATLAS_REL_ERROR_SSB: double (nullable = true)
 |-- LUMI_LIFETIME_ATLAS_SSB: double (nullable = true)
 |-- LUMI_LIFETIME_CMS_EOF: double (nullable = true)
 |-- LUMI_LIFETIME_CMS_REL_ERROR_EOF: double (nullable = true)
 |-- LUMI_LIFETIME_CMS_REL_ERROR_SSB: double (nullable = true)
 |-- LUMI_LIFETIME_CMS_SSB: double (nullable = true)
 |-- LUMI_LIFETIME_EOF: double (nullable = true)
 |-- LUMI_LIFETIME_REL_ERROR_EOF: double (nullable = true)
 |-- LUMI_LIFETIME_REL_ERROR_SSB: double (nullable = true)
 |-- LUMI_LIFETIME_SSB: double (nullable = true)
 |-- NUMBER_COLLISIONS_IP1_5: integer (nullable = true)
 |-- NUMBER_COLLISIONS_IP2: integer (nullable = true)
 |-- NUMBER_COLLISIONS_IP8: integer (nullable = true)
 |-- PR_H_ROT_EMITTANCE_AVG_FT: double (nullable = true)
 |-- PR_H_ROT_EMITTANCE_AVG_INJ: double (nullable = true)
 |-- PR_H_ROT_INTENSITY_AVG: double (nullable = true)
 |-- PR_H_ROT_INTENSITY_AVG_FT: double (nullable = true)
 |-- PR_H_ROT_TAIL_AVG_FT: double (nullable = true)
 |-- PR_H_ROT_TAIL_AVG_INJ: double (nullable = true)
 |-- PR_V_ROT_EMITTANCE_AVG_FT: double (nullable = true)
 |-- PR_V_ROT_EMITTANCE_AVG_INJ: double (nullable = true)
 |-- PR_V_ROT_INTENSITY_AVG: double (nullable = true)
 |-- PR_V_ROT_INTENSITY_AVG_FT: double (nullable = true)
 |-- PR_V_ROT_TAIL_AVG_FT: double (nullable = true)
 |-- PR_V_ROT_TAIL_AVG_INJ: double (nullable = true)
 |-- PSB:BRIGHTNESS_AVG_R1: double (nullable = true)
 |-- PSB:BRIGHTNESS_AVG_R2: double (nullable = true)
 |-- PSB:BRIGHTNESS_AVG_R3: double (nullable = true)
 |-- PSB:BRIGHTNESS_AVG_R4: double (nullable = true)
 |-- SPS:BUNCH_BRIGHTNESS_AVG_IN: double (nullable = true)
 |-- SPS_H_ROT:EMITTANCE_AVG_FT: double (nullable = true)
 |-- SPS_H_ROT:EMITTANCE_AVG_INJ: double (nullable = true)
 |-- SPS_H_ROT_IN:BUNCH_INT: double (nullable = true)
 |-- SPS_H_ROT_IN:EMITTANCE: double (nullable = true)
 |-- SPS_H_ROT_IN:ENERGY: double (nullable = true)
 |-- SPS_H_ROT_IN:TAIL_POP: double (nullable = true)
 |-- SPS_V_ROT:EMITTANCE_AVG_FT: double (nullable = true)
 |-- SPS_V_ROT:EMITTANCE_AVG_INJ: double (nullable = true)
 |-- SPS_V_ROT_IN:BUNCH_INT: double (nullable = true)
 |-- SPS_V_ROT_IN:EMITTANCE: double (nullable = true)
 |-- SPS_V_ROT_IN:ENERGY: double (nullable = true)
 |-- SPS_V_ROT_IN:TAIL_POP: double (nullable = true)
 |-- TIME_DURATION_BM_ACCESS_NO_BEAM: long (nullable = true)
 |-- TIME_DURATION_BM_BEAM_IN: long (nullable = true)
 |-- TIME_DURATION_BM_MACHINE_SETUP: long (nullable = true)
 |-- TIME_DURATION_BM_RAMP_SQUEEZE: long (nullable = true)
 |-- TIME_DURATION_BM_STABLE: long (nullable = true)
 |-- TIME_DURATION_FILL: long (nullable = true)
 |-- TIME_DURATION_FTSQAD: long (nullable = true)
 |-- TIME_DURATION_INJECTION: long (nullable = true)
 |-- TIME_DURATION_RAMP: long (nullable = true)
 |-- TIME_DURATION_SETUP: long (nullable = true)
 |-- TIME_DURATION_STABLEBEAMS: long (nullable = true)
 |-- TIME_DURATION_TURNAROUND: long (nullable = true)
 |-- TIME_END_FILL: long (nullable = true)
 |-- TIME_END_STABLEBEAM: long (nullable = true)
 |-- TIME_START_FILL: long (nullable = true)
 |-- TIME_START_FLATTOP: long (nullable = true)
 |-- TIME_START_INJECTION: long (nullable = true)
 |-- TIME_START_STABLEBEAMS: long (nullable = true)
 |-- YEAR: long (nullable = true)
 |-- __record_timestamp__: long (nullable = true)
 |-- __record_version__: long (nullable = true)
 |-- acqStamp: long (nullable = true)
 |-- class: string (nullable = true)
 |-- cyclestamp: long (nullable = true)
 |-- device: string (nullable = true)
 |-- property: string (nullable = true)
 |-- selector: string (nullable = true)
 |-- nxcals_entity_id: long (nullable = true)

In [14]:
df.select("acqStamp", "device", "LUMI_ATLAS_PEAK").show()
+-------------------+---------+--------------------+
|           acqStamp|   device|     LUMI_ATLAS_PEAK|
+-------------------+---------+--------------------+
|1667905679423738525|LHC_STATS|                null|
|1667864783774363525|LHC_STATS|  3.5779993534088135|
|1667822484470738525|LHC_STATS|                null|
|1667774087439988525|LHC_STATS|                null|
|1667703231948988525|LHC_STATS|0.002851842204108...|
|1667629713050363525|LHC_STATS|     19311.009765625|
|1667490492435738525|LHC_STATS|                null|
|1667321660221238525|LHC_STATS|     19136.162109375|
|1667218881892238525|LHC_STATS|                null|
|1667152839888738525|LHC_STATS|                null|
|1667010181393738525|LHC_STATS|       19072.6171875|
|1666650939478488525|LHC_STATS|       19087.2265625|
|1666353810345738525|LHC_STATS|                null|
|1666340687922238525|LHC_STATS|    2665.25634765625|
|1666339796457488525|LHC_STATS|                null|
|1666268334454488525|LHC_STATS|      18732.56640625|
|1666263572235863525|LHC_STATS|                null|
|1666185949344738525|LHC_STATS|   2549.858154296875|
|1666049957622238525|LHC_STATS|                null|
|1666016050093613525|LHC_STATS|                null|
+-------------------+---------+--------------------+
only showing top 20 rows

BPT downsampling

In [15]:
from cern.nxcals.pyquery.builders import *
from pyspark.sql.functions import *

sdf = DataQuery(spark).byVariables() \
    .system('CMW') \
    .startTime('2022-01-01 00:00:00.00') \
    .endTime('2022-12-31 00:00:00.00') \
    .variables(['LHC.BCTFR.A6R4.B1:BEAM_INTENSITY', 'LHC.BCTFR.A6R4.B2:BEAM_INTENSITY']) \
    .buildDataset()
In [16]:
# extract with downsampling
# average data on bins of 1h

sdf_ds = ( sdf.withColumn('time_bin', floor(col('nxcals_timestamp') / 3600e9)*3600)
              .groupby(['nxcals_variable_name', 'time_bin']).mean('nxcals_value') 
              .withColumnRenamed('avg(nxcals_value)', 'nxcals_value') \
              .orderBy('time_bin')
         )
In [17]:
sdf_ds.count()
Out[17]:
16869
In [21]:
sdf_ds.show(3, False)
+--------------------------------+----------+------------+
|nxcals_variable_name            |time_bin  |nxcals_value|
+--------------------------------+----------+------------+
|LHC.BCTFR.A6R4.B2:BEAM_INTENSITY|1640995200|0.0         |
|LHC.BCTFR.A6R4.B1:BEAM_INTENSITY|1640995200|0.0         |
|LHC.BCTFR.A6R4.B2:BEAM_INTENSITY|1640998800|0.0         |
+--------------------------------+----------+------------+
only showing top 3 rows

In [22]:
# Show another method for creating time bins

sdf.withColumn('time_bin', window(timestamp_seconds(col('nxcals_timestamp')/1e9), '10 minutes')).show(3)
+--------------------+----------------+-------------------+--------------------+--------------------+
|        nxcals_value|nxcals_entity_id|   nxcals_timestamp|nxcals_variable_name|            time_bin|
+--------------------+----------------+-------------------+--------------------+--------------------+
|3.363701276527677...|           46424|1666656000010076000|LHC.BCTFR.A6R4.B2...|{2022-10-25 02:00...|
|3.362054560455108E14|           46420|1666656001010080000|LHC.BCTFR.A6R4.B1...|{2022-10-25 02:00...|
|3.363426610018308E14|           46424|1666656001010080000|LHC.BCTFR.A6R4.B2...|{2022-10-25 02:00...|
+--------------------+----------------+-------------------+--------------------+--------------------+
only showing top 3 rows

In [20]:
# spark.stop()
In [ ]: