{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\"SWAN\"\n", "\"EP-SFT\"\n", "

\n", "

Integration of SWAN with Spark clusters

\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The current setup allows to execute PySpark operations on CERN Hadoop and Spark clusters. \n", "\n", "This notebook illustrates the use of __Spark in SWAN to access CERN Accelerator logging service data__." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Connect to the cluster (NXCals)\n", "In the SWAN configuration menu:\n", "- Choose the NXCals project software stack\n", "- Choose the NXCals Hadoop cluster" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To connect to a cluster, click on the star button on the top and follow the instructions\n", "* The star button only appears if you have selected a SPARK cluster in the configuration\n", "* The star button is active after the notebook kernel is ready\n", "* SELECT NXCALS configuration bundle\n", "* Access to the cluster and NXCALS data is controlled by acc-logging-team, please contact acc-logging-team@cern.ch\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# NXCals DataExtraction API - Examples\n", "See NXCals API documentation at: http://nxcals-docs.web.cern.ch/current/" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1) Extract data using device/property pairs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Inspect data" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "23/03/12 21:18:01 WARN URLConfigurationSource: No URLs will be polled as dynamic configuration sources.\n" ] } ], "source": [ "# source the nxcals query builders\n", "from nxcals.api.extraction.data.builders import *\n", "\n", "# define the extraction Spark DataFrame using the NXCALS API\n", "# see also \n", "df1 = ( DevicePropertyDataQuery.builder(spark)\n", " .system('CMW')\n", " .startTime('2022-11-01 00:00:00.000')\n", " .endTime('2022-11-02 00:00:00.000')\n", " .entity()\n", " .device('RADMON.PS-10')\n", " .property('ExpertMonitoringAcquisition')\n", " .build()\n", " )" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- PinDiodeVoltageOutOfRange: boolean (nullable = true)\n", " |-- Radfet1VoltageOutOfRange: boolean (nullable = true)\n", " |-- Radfet2VoltageOutOfRange: boolean (nullable = true)\n", " |-- __record_timestamp__: long (nullable = true)\n", " |-- __record_version__: long (nullable = true)\n", " |-- acqStamp: long (nullable = true)\n", " |-- class: string (nullable = true)\n", " |-- current_18V: double (nullable = true)\n", " |-- current_8V5: double (nullable = true)\n", " |-- current_memBanks: double (nullable = true)\n", " |-- current_radfet: double (nullable = true)\n", " |-- cyclestamp: long (nullable = true)\n", " |-- device: string (nullable = true)\n", " |-- property: string (nullable = true)\n", " |-- pt100Value: double (nullable = true)\n", " |-- selector: string (nullable = true)\n", " |-- temperatureDeported: double (nullable = true)\n", " |-- temperatureSensorBoard: double (nullable = true)\n", " |-- voltage_18V: double (nullable = true)\n", " |-- voltage_3V3: double (nullable = true)\n", " |-- voltage_3VNeg: double (nullable = true)\n", " |-- voltage_5V: double (nullable = true)\n", " |-- voltage_memoryBank2: double (nullable = true)\n", " |-- voltage_sensorAdcRef: double (nullable = true)\n", " |-- warningBits: integer (nullable = true)\n", " |-- nxcals_entity_id: long (nullable = true)\n", "\n" ] } ], "source": [ "df1.printSchema()" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------------+------------------+------------------+------------+------------------+\n", "| acqStamp| voltage_18V| current_18V| device| pt100Value|\n", "+-------------------+------------------+------------------+------------+------------------+\n", "|1667260801078283000| null| null|RADMON.PS-10| 109.4169887|\n", "|1667260802078290000| null| 44.00730505585671|RADMON.PS-10|109.29791806000001|\n", "|1667260803078315000| 20.02644650638104|44.098701328039176|RADMON.PS-10|109.08954444000001|\n", "|1667260805078360000| 20.02644650638104| 44.1291667521|RADMON.PS-10|108.70256486000001|\n", "|1667260807078387000|20.024923235177997|44.464286416769035|RADMON.PS-10| 109.49140785|\n", "|1667260808078408000| null| 44.02253776788712|RADMON.PS-10| 108.77698401|\n", "|1667260809078413000| null| 44.40335556864739|RADMON.PS-10| 108.76210018|\n", "|1667260811078467000|20.024923235177997| 44.38812285661698|RADMON.PS-10| null|\n", "|1667260812078480000| null|44.053003191947944|RADMON.PS-10| null|\n", "|1667260813078499000| null| null|RADMON.PS-10| 109.05977678|\n", "|1667260814078513000| 20.02796977758408| 44.00730505585671|RADMON.PS-10|108.68768103000001|\n", "|1667260815078547000|20.024923235177997| 44.38812285661698|RADMON.PS-10| 109.22349891|\n", "|1667260816078531000| null| 44.00730505585671|RADMON.PS-10| 108.95558997|\n", "|1667260817078565000| null| 44.40335556864739|RADMON.PS-10|109.53605934000001|\n", "|1667260820078616000| null| 44.11393404006959|RADMON.PS-10|108.97047380000001|\n", "|1667260821078618000| 20.02644650638104| null|RADMON.PS-10|108.73233252000001|\n", "|1667260822078631000| null| 43.97683963179589|RADMON.PS-10|109.53605934000001|\n", "|1667260823078639000| null|44.053003191947944|RADMON.PS-10|109.46164019000001|\n", "|1667260824078638000| null| 44.31195929646493|RADMON.PS-10| null|\n", "|1667260829078727000| 20.02644650638104| null|RADMON.PS-10|108.92582231000002|\n", "+-------------------+------------------+------------------+------------+------------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "df1.select('acqStamp','voltage_18V','current_18V','device','pt100Value').show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Register the dataFrame as temporary table (myData) to write direct SQL queries." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "23/03/12 21:18:14 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.\n" ] } ], "source": [ "df1.createOrReplaceTempView(\"myData\")" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------------+-------------------+\n", "| min(acqStamp)| max(acqStamp)|\n", "+-------------------+-------------------+\n", "|1667260800078284000|1667347199568841000|\n", "+-------------------+-------------------+\n", "\n" ] } ], "source": [ "# a simple select with min / max\n", "spark.sql(\"select min(acqStamp), max(acqStamp) from myData\").show()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------------+--------+\n", "| device|count(1)|\n", "+------------+--------+\n", "|RADMON.PS-10| 86399|\n", "+------------+--------+\n", "\n" ] } ], "source": [ "# perform some aggregations\n", "spark.sql(\"select device, count(*) from myData group by device\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Draw a plot using matplotlib" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "23/03/12 21:18:29 WARN CheckAllocator: More than one DefaultAllocationManager on classpath. Choosing first found\n" ] } ], "source": [ "# Select the data columns of interest\n", "# fetch data into a Pandas dataframe for visualization\n", "import pandas as pd\n", "\n", "p_df = df1.select('acqStamp','current_18V').filter(\"current_18V is not null\").toPandas()\n" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "import matplotlib\n", "from matplotlib.ticker import FuncFormatter, MaxNLocator\n", "%matplotlib inline\n", "\n", "data = p_df.sort_values(by='acqStamp')\n", "\n", "def format_fn(tick_val, tick_pos):\n", " return pd.to_datetime(tick_val,unit='ns')\n", "\n", "myplot=data.plot(x='acqStamp',y='current_18V', figsize=(15,5))\n", "myplot.set_xlabel('Time')\n", "myplot.set_ylabel('Current')\n", "\n", "myplot.xaxis.set_major_formatter(FuncFormatter(format_fn))\n", "myplot.xaxis.set_major_locator(MaxNLocator(integer=True))\n", "myplot.tick_params(axis='x', which='major', labelsize='small', labelcolor='r', rotation=45)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2) Read TGM data" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "# Map data extraction to a Spark DataFrame using the NXCALS API\n", "tgmData = ( DevicePropertyDataQuery.builder(spark)\n", " .system('CMW')\n", " .startTime('2022-11-01 00:00:00.000')\n", " .endTime('2022-11-02 00:00:00.000')\n", " .entity()\n", " .device(\"CPS.TGM\")\n", " .property(\"FULL-TELEGRAM.STRC\")\n", " .build()\n", " )" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
BATCHBEAMIDBPNMCOMLNCYCLECYTAGDESTDEST2DURNFREE14...USER__record_timestamp____record_version__acqStampclasscyclestampdevicepropertyselectornxcals_entity_id
005709064None48NaNPS_DUMPNONE1NaN...ZERO166726081390000000001667260813850000000CPS.TGM1667260813900000000CPS.TGMFULL-TELEGRAM.STRCCPS.USER.ALL46955
101655525None22NaNNTOFNONE1NaN...TOF166726084870000000001667260848650000000CPS.TGM1667260848700000000CPS.TGMFULL-TELEGRAM.STRCCPS.USER.ALL46955
\n", "

2 rows × 34 columns

\n", "
" ], "text/plain": [ " BATCH BEAMID BPNM COMLN CYCLE CYTAG DEST DEST2 DURN FREE14 ... \\\n", "0 0 57090 64 None 48 NaN PS_DUMP NONE 1 NaN ... \n", "1 0 16555 25 None 22 NaN NTOF NONE 1 NaN ... \n", "\n", " USER __record_timestamp__ __record_version__ acqStamp class \\\n", "0 ZERO 1667260813900000000 0 1667260813850000000 CPS.TGM \n", "1 TOF 1667260848700000000 0 1667260848650000000 CPS.TGM \n", "\n", " cyclestamp device property selector \\\n", "0 1667260813900000000 CPS.TGM FULL-TELEGRAM.STRC CPS.USER.ALL \n", "1 1667260848700000000 CPS.TGM FULL-TELEGRAM.STRC CPS.USER.ALL \n", "\n", " nxcals_entity_id \n", "0 46955 \n", "1 46955 \n", "\n", "[2 rows x 34 columns]" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "tgmData.toPandas()[:2]" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+-----+\n", "| USER|count|\n", "+--------+-----+\n", "| MD4| 263|\n", "| LHCIND2| 212|\n", "| EAST1| 4205|\n", "| AD| 746|\n", "| LHC5| 70|\n", "| MD6| 276|\n", "|LHCPILOT| 64|\n", "| ION3| 1095|\n", "| ZERO|15940|\n", "| SFTPRO3| 3473|\n", "| ION1| 2168|\n", "| EAST3| 6691|\n", "| MD2| 297|\n", "| LHC3| 1205|\n", "| MD10| 93|\n", "| SFTPRO1| 1364|\n", "| TOF|13072|\n", "| MD5| 117|\n", "| SFTPRO2| 1921|\n", "+--------+-----+\n", "\n" ] } ], "source": [ "# Aggregate and count per user\n", "tgmData.groupBy(\"USER\").count().show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## DataQuery example" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "# Map the data into a dataframe, extract using DataQuery in the NXCALS api\n", "df = ( DataQuery.builder(spark).entities()\n", " .system(\"CMW\")\n", " .keyValuesEq({\"device\": \"LHC_STATS\", \"property\": \"Calculations\"})\n", " .timeWindow(\"2022-10-10 00:00:00.0\", \"2022-11-10 00:00:00.0\")\n", " .build()\n", " )" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- ALICE_BKGD1: double (nullable = true)\n", " |-- ALICE_BKGD2: double (nullable = true)\n", " |-- ALICE_BKGD3: double (nullable = true)\n", " |-- ALICE_BPTX_DELTA_T: double (nullable = true)\n", " |-- ALICE_HALF_CROSSING_ANGLE: double (nullable = true)\n", " |-- ALICE_LUMI_REGION_CENTROID_X: double (nullable = true)\n", " |-- ALICE_LUMI_REGION_CENTROID_Y: double (nullable = true)\n", " |-- ALICE_LUMI_REGION_CENTROID_Z: double (nullable = true)\n", " |-- ALICE_LUMI_REGION_SIZE_X: double (nullable = true)\n", " |-- ALICE_LUMI_REGION_SIZE_Y: double (nullable = true)\n", " |-- ALICE_LUMI_REGION_SIZE_Z: double (nullable = true)\n", " |-- ALICE_SPECTROMETER: double (nullable = true)\n", " |-- ALICE_TILT_H: double (nullable = true)\n", " |-- ALICE_TILT_V: double (nullable = true)\n", " |-- ATLAS_BKGD1: double (nullable = true)\n", " |-- ATLAS_BKGD2: double (nullable = true)\n", " |-- ATLAS_BKGD3: double (nullable = true)\n", " |-- ATLAS_BPTX_DELTA_T: double (nullable = true)\n", " |-- ATLAS_HALF_CROSSING_ANGLE: double (nullable = true)\n", " |-- ATLAS_LUMI_REGION_SIZE_X: double (nullable = true)\n", " |-- ATLAS_LUMI_REGION_SIZE_Y: double (nullable = true)\n", " |-- ATLAS_LUMI_REGION_SIZE_Z: double (nullable = true)\n", " |-- ATLAS_TILT_H: double (nullable = true)\n", " |-- ATLAS_TILT_V: double (nullable = true)\n", " |-- B1_AVG_BUNCH_INTENSITY: double (nullable = true)\n", " |-- B1_BUNCH_LENGTH_MEAN_EOF: double (nullable = true)\n", " |-- B1_BUNCH_LENGTH_MEAN_SSB: double (nullable = true)\n", " |-- B1_INTENSITY_ATLAS_CMS_SSB: double (nullable = true)\n", " |-- B1_INTENSITY_DCBCT_SSB: double (nullable = true)\n", " |-- B1_INTENSITY_SSB: double (nullable = true)\n", " |-- B1_NUMBER_BUNCHES: integer (nullable = true)\n", " |-- B1_PARTICLE_TYPE: string (nullable = true)\n", " |-- B2_AVG_BUNCH_INTENSITY: double (nullable = true)\n", " |-- B2_BUNCH_LENGTH_MEAN_EOF: double (nullable = true)\n", " |-- B2_BUNCH_LENGTH_MEAN_SSB: double (nullable = true)\n", " |-- B2_INTENSITY_ATLAS_CMS_SSB: double (nullable = true)\n", " |-- B2_INTENSITY_DCBCT_SSB: double (nullable = true)\n", " |-- B2_INTENSITY_SSB: double (nullable = true)\n", " |-- B2_NUMBER_BUNCHES: integer (nullable = true)\n", " |-- B2_PARTICLE_TYPE: string (nullable = true)\n", " |-- BETA_STAR_ALICE: double (nullable = true)\n", " |-- BETA_STAR_ATLAS: double (nullable = true)\n", " |-- BETA_STAR_CMS: double (nullable = true)\n", " |-- BETA_STAR_LHCB: double (nullable = true)\n", " |-- BR1_H_ROT_EMITTANCE_AVG: double (nullable = true)\n", " |-- BR1_H_ROT_INTENSITY_AVG: double (nullable = true)\n", " |-- BR1_H_ROT_TAIL_AVG: double (nullable = true)\n", " |-- BR1_V_ROT_EMITTANCE_AVG: double (nullable = true)\n", " |-- BR1_V_ROT_INTENSITY_AVG: double (nullable = true)\n", " |-- BR1_V_ROT_TAIL_AVG: double (nullable = true)\n", " |-- BR2_H_ROT_EMITTANCE_AVG: double (nullable = true)\n", " |-- BR2_H_ROT_INTENSITY_AVG: double (nullable = true)\n", " |-- BR2_H_ROT_TAIL_AVG: double (nullable = true)\n", " |-- BR2_V_ROT_EMITTANCE_AVG: double (nullable = true)\n", " |-- BR2_V_ROT_INTENSITY_AVG: double (nullable = true)\n", " |-- BR2_V_ROT_TAIL_AVG: double (nullable = true)\n", " |-- BR3_H_ROT_EMITTANCE_AVG: double (nullable = true)\n", " |-- BR3_H_ROT_INTENSITY_AVG: double (nullable = true)\n", " |-- BR3_H_ROT_TAIL_AVG: double (nullable = true)\n", " |-- BR3_V_ROT_EMITTANCE_AVG: double (nullable = true)\n", " |-- BR3_V_ROT_INTENSITY_AVG: double (nullable = true)\n", " |-- BR3_V_ROT_TAIL_AVG: double (nullable = true)\n", " |-- BR4_H_ROT_EMITTANCE_AVG: double (nullable = true)\n", " |-- BR4_H_ROT_INTENSITY_AVG: double (nullable = true)\n", " |-- BR4_H_ROT_TAIL_AVG: double (nullable = true)\n", " |-- BR4_V_ROT_EMITTANCE_AVG: double (nullable = true)\n", " |-- BR4_V_ROT_INTENSITY_AVG: double (nullable = true)\n", " |-- BR4_V_ROT_TAIL_AVG: double (nullable = true)\n", " |-- BUNCH_SPACING_B1: integer (nullable = true)\n", " |-- CMS_BKGD1: double (nullable = true)\n", " |-- CMS_BKGD2: double (nullable = true)\n", " |-- CMS_BKGD3: double (nullable = true)\n", " |-- CMS_BPTX_DELTA_T: double (nullable = true)\n", " |-- CMS_HALF_CROSSING_ANGLE: double (nullable = true)\n", " |-- CMS_LUMI_REGION_SIZE_X: double (nullable = true)\n", " |-- CMS_LUMI_REGION_SIZE_Y: double (nullable = true)\n", " |-- CMS_LUMI_REGION_SIZE_Z: double (nullable = true)\n", " |-- CMS_TILT_H: double (nullable = true)\n", " |-- CMS_TILT_V: double (nullable = true)\n", " |-- CPS:BRIGHTNESS_AVG_FT: double (nullable = true)\n", " |-- CPS:BRIGHTNESS_AVG_INJ: double (nullable = true)\n", " |-- EMITTANCE_BSRT_B1_H_EOF: double (nullable = true)\n", " |-- EMITTANCE_BSRT_B1_H_SSB: double (nullable = true)\n", " |-- EMITTANCE_BSRT_B1_V_EOF: double (nullable = true)\n", " |-- EMITTANCE_BSRT_B1_V_SSB: double (nullable = true)\n", " |-- EMITTANCE_BSRT_B2_H_EOF: double (nullable = true)\n", " |-- EMITTANCE_BSRT_B2_H_SSB: double (nullable = true)\n", " |-- EMITTANCE_BSRT_B2_V_EOF: double (nullable = true)\n", " |-- EMITTANCE_BSRT_B2_V_SSB: double (nullable = true)\n", " |-- EMITTANCE_FROM_LUMI_SSB: double (nullable = true)\n", " |-- ENERGY: double (nullable = true)\n", " |-- FILL_NUMBER: integer (nullable = true)\n", " |-- HEATLOAD_SSB_ARC12: double (nullable = true)\n", " |-- HEATLOAD_SSB_ARC23: double (nullable = true)\n", " |-- HEATLOAD_SSB_ARC34: double (nullable = true)\n", " |-- HEATLOAD_SSB_ARC45: double (nullable = true)\n", " |-- HEATLOAD_SSB_ARC56: double (nullable = true)\n", " |-- HEATLOAD_SSB_ARC67: double (nullable = true)\n", " |-- HEATLOAD_SSB_ARC78: double (nullable = true)\n", " |-- HEATLOAD_SSB_ARC81: double (nullable = true)\n", " |-- IS_HB: boolean (nullable = true)\n", " |-- IS_IONS: boolean (nullable = true)\n", " |-- IS_MD: boolean (nullable = true)\n", " |-- IS_PROTONS: boolean (nullable = true)\n", " |-- IS_PROTON_ION: boolean (nullable = true)\n", " |-- IS_STABLE_BEAMS: boolean (nullable = true)\n", " |-- IS_TS: boolean (nullable = true)\n", " |-- LHC:INJECTION_SCHEME: string (nullable = true)\n", " |-- LHCB_BKGD1: double (nullable = true)\n", " |-- LHCB_BKGD2: double (nullable = true)\n", " |-- LHCB_BKGD3: double (nullable = true)\n", " |-- LHCB_BPTX_DELTA_T: double (nullable = true)\n", " |-- LHCB_HALF_CROSSING_ANGLE: double (nullable = true)\n", " |-- LHCB_LUMI_REGION_SIZE_X: double (nullable = true)\n", " |-- LHCB_LUMI_REGION_SIZE_Y: double (nullable = true)\n", " |-- LHCB_LUMI_REGION_SIZE_Z: double (nullable = true)\n", " |-- LHCB_POLARITY: double (nullable = true)\n", " |-- LHCB_SPECTROMETER: double (nullable = true)\n", " |-- LHCB_TILT_H: double (nullable = true)\n", " |-- LHCB_TILT_V: double (nullable = true)\n", " |-- LHC_BEAM_TYPE: string (nullable = true)\n", " |-- LUMI_ALICE_DELIVERED: double (nullable = true)\n", " |-- LUMI_ALICE_PEAK: double (nullable = true)\n", " |-- LUMI_ATLAS_DELIVERED: double (nullable = true)\n", " |-- LUMI_ATLAS_PEAK: double (nullable = true)\n", " |-- LUMI_CMS_DELIVERED: double (nullable = true)\n", " |-- LUMI_CMS_PEAK: double (nullable = true)\n", " |-- LUMI_LHCB_DELIVERED: double (nullable = true)\n", " |-- LUMI_LHCB_PEAK: double (nullable = true)\n", " |-- LUMI_LIFETIME_ATLAS_EOF: double (nullable = true)\n", " |-- LUMI_LIFETIME_ATLAS_REL_ERROR_EOF: double (nullable = true)\n", " |-- LUMI_LIFETIME_ATLAS_REL_ERROR_SSB: double (nullable = true)\n", " |-- LUMI_LIFETIME_ATLAS_SSB: double (nullable = true)\n", " |-- LUMI_LIFETIME_CMS_EOF: double (nullable = true)\n", " |-- LUMI_LIFETIME_CMS_REL_ERROR_EOF: double (nullable = true)\n", " |-- LUMI_LIFETIME_CMS_REL_ERROR_SSB: double (nullable = true)\n", " |-- LUMI_LIFETIME_CMS_SSB: double (nullable = true)\n", " |-- LUMI_LIFETIME_EOF: double (nullable = true)\n", " |-- LUMI_LIFETIME_REL_ERROR_EOF: double (nullable = true)\n", " |-- LUMI_LIFETIME_REL_ERROR_SSB: double (nullable = true)\n", " |-- LUMI_LIFETIME_SSB: double (nullable = true)\n", " |-- NUMBER_COLLISIONS_IP1_5: integer (nullable = true)\n", " |-- NUMBER_COLLISIONS_IP2: integer (nullable = true)\n", " |-- NUMBER_COLLISIONS_IP8: integer (nullable = true)\n", " |-- PR_H_ROT_EMITTANCE_AVG_FT: double (nullable = true)\n", " |-- PR_H_ROT_EMITTANCE_AVG_INJ: double (nullable = true)\n", " |-- PR_H_ROT_INTENSITY_AVG: double (nullable = true)\n", " |-- PR_H_ROT_INTENSITY_AVG_FT: double (nullable = true)\n", " |-- PR_H_ROT_TAIL_AVG_FT: double (nullable = true)\n", " |-- PR_H_ROT_TAIL_AVG_INJ: double (nullable = true)\n", " |-- PR_V_ROT_EMITTANCE_AVG_FT: double (nullable = true)\n", " |-- PR_V_ROT_EMITTANCE_AVG_INJ: double (nullable = true)\n", " |-- PR_V_ROT_INTENSITY_AVG: double (nullable = true)\n", " |-- PR_V_ROT_INTENSITY_AVG_FT: double (nullable = true)\n", " |-- PR_V_ROT_TAIL_AVG_FT: double (nullable = true)\n", " |-- PR_V_ROT_TAIL_AVG_INJ: double (nullable = true)\n", " |-- PSB:BRIGHTNESS_AVG_R1: double (nullable = true)\n", " |-- PSB:BRIGHTNESS_AVG_R2: double (nullable = true)\n", " |-- PSB:BRIGHTNESS_AVG_R3: double (nullable = true)\n", " |-- PSB:BRIGHTNESS_AVG_R4: double (nullable = true)\n", " |-- SPS:BUNCH_BRIGHTNESS_AVG_IN: double (nullable = true)\n", " |-- SPS_H_ROT:EMITTANCE_AVG_FT: double (nullable = true)\n", " |-- SPS_H_ROT:EMITTANCE_AVG_INJ: double (nullable = true)\n", " |-- SPS_H_ROT_IN:BUNCH_INT: double (nullable = true)\n", " |-- SPS_H_ROT_IN:EMITTANCE: double (nullable = true)\n", " |-- SPS_H_ROT_IN:ENERGY: double (nullable = true)\n", " |-- SPS_H_ROT_IN:TAIL_POP: double (nullable = true)\n", " |-- SPS_V_ROT:EMITTANCE_AVG_FT: double (nullable = true)\n", " |-- SPS_V_ROT:EMITTANCE_AVG_INJ: double (nullable = true)\n", " |-- SPS_V_ROT_IN:BUNCH_INT: double (nullable = true)\n", " |-- SPS_V_ROT_IN:EMITTANCE: double (nullable = true)\n", " |-- SPS_V_ROT_IN:ENERGY: double (nullable = true)\n", " |-- SPS_V_ROT_IN:TAIL_POP: double (nullable = true)\n", " |-- TIME_DURATION_BM_ACCESS_NO_BEAM: long (nullable = true)\n", " |-- TIME_DURATION_BM_BEAM_IN: long (nullable = true)\n", " |-- TIME_DURATION_BM_MACHINE_SETUP: long (nullable = true)\n", " |-- TIME_DURATION_BM_RAMP_SQUEEZE: long (nullable = true)\n", " |-- TIME_DURATION_BM_STABLE: long (nullable = true)\n", " |-- TIME_DURATION_FILL: long (nullable = true)\n", " |-- TIME_DURATION_FTSQAD: long (nullable = true)\n", " |-- TIME_DURATION_INJECTION: long (nullable = true)\n", " |-- TIME_DURATION_RAMP: long (nullable = true)\n", " |-- TIME_DURATION_SETUP: long (nullable = true)\n", " |-- TIME_DURATION_STABLEBEAMS: long (nullable = true)\n", " |-- TIME_DURATION_TURNAROUND: long (nullable = true)\n", " |-- TIME_END_FILL: long (nullable = true)\n", " |-- TIME_END_STABLEBEAM: long (nullable = true)\n", " |-- TIME_START_FILL: long (nullable = true)\n", " |-- TIME_START_FLATTOP: long (nullable = true)\n", " |-- TIME_START_INJECTION: long (nullable = true)\n", " |-- TIME_START_STABLEBEAMS: long (nullable = true)\n", " |-- YEAR: long (nullable = true)\n", " |-- __record_timestamp__: long (nullable = true)\n", " |-- __record_version__: long (nullable = true)\n", " |-- acqStamp: long (nullable = true)\n", " |-- class: string (nullable = true)\n", " |-- cyclestamp: long (nullable = true)\n", " |-- device: string (nullable = true)\n", " |-- property: string (nullable = true)\n", " |-- selector: string (nullable = true)\n", " |-- nxcals_entity_id: long (nullable = true)\n", "\n" ] } ], "source": [ "df.printSchema()" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------------+---------+--------------------+\n", "| acqStamp| device| LUMI_ATLAS_PEAK|\n", "+-------------------+---------+--------------------+\n", "|1667905679423738525|LHC_STATS| null|\n", "|1667864783774363525|LHC_STATS| 3.5779993534088135|\n", "|1667822484470738525|LHC_STATS| null|\n", "|1667774087439988525|LHC_STATS| null|\n", "|1667703231948988525|LHC_STATS|0.002851842204108...|\n", "|1667629713050363525|LHC_STATS| 19311.009765625|\n", "|1667490492435738525|LHC_STATS| null|\n", "|1667321660221238525|LHC_STATS| 19136.162109375|\n", "|1667218881892238525|LHC_STATS| null|\n", "|1667152839888738525|LHC_STATS| null|\n", "|1667010181393738525|LHC_STATS| 19072.6171875|\n", "|1666650939478488525|LHC_STATS| 19087.2265625|\n", "|1666353810345738525|LHC_STATS| null|\n", "|1666340687922238525|LHC_STATS| 2665.25634765625|\n", "|1666339796457488525|LHC_STATS| null|\n", "|1666268334454488525|LHC_STATS| 18732.56640625|\n", "|1666263572235863525|LHC_STATS| null|\n", "|1666185949344738525|LHC_STATS| 2549.858154296875|\n", "|1666049957622238525|LHC_STATS| null|\n", "|1666016050093613525|LHC_STATS| null|\n", "+-------------------+---------+--------------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "df.select(\"acqStamp\", \"device\", \"LUMI_ATLAS_PEAK\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### BPT downsampling" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "from cern.nxcals.pyquery.builders import *\n", "from pyspark.sql.functions import *\n", "\n", "sdf = DataQuery(spark).byVariables() \\\n", " .system('CMW') \\\n", " .startTime('2022-01-01 00:00:00.00') \\\n", " .endTime('2022-12-31 00:00:00.00') \\\n", " .variables(['LHC.BCTFR.A6R4.B1:BEAM_INTENSITY', 'LHC.BCTFR.A6R4.B2:BEAM_INTENSITY']) \\\n", " .buildDataset()\n" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "# extract with downsampling\n", "# average data on bins of 1h\n", "\n", "sdf_ds = ( sdf.withColumn('time_bin', floor(col('nxcals_timestamp') / 3600e9)*3600)\n", " .groupby(['nxcals_variable_name', 'time_bin']).mean('nxcals_value') \n", " .withColumnRenamed('avg(nxcals_value)', 'nxcals_value') \\\n", " .orderBy('time_bin')\n", " )" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "16869" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sdf_ds.count()" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------------------+----------+------------+\n", "|nxcals_variable_name |time_bin |nxcals_value|\n", "+--------------------------------+----------+------------+\n", "|LHC.BCTFR.A6R4.B2:BEAM_INTENSITY|1640995200|0.0 |\n", "|LHC.BCTFR.A6R4.B1:BEAM_INTENSITY|1640995200|0.0 |\n", "|LHC.BCTFR.A6R4.B2:BEAM_INTENSITY|1640998800|0.0 |\n", "+--------------------------------+----------+------------+\n", "only showing top 3 rows\n", "\n" ] } ], "source": [ "sdf_ds.show(3, False)" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+----------------+-------------------+--------------------+--------------------+\n", "| nxcals_value|nxcals_entity_id| nxcals_timestamp|nxcals_variable_name| time_bin|\n", "+--------------------+----------------+-------------------+--------------------+--------------------+\n", "|3.363701276527677...| 46424|1666656000010076000|LHC.BCTFR.A6R4.B2...|{2022-10-25 02:00...|\n", "|3.362054560455108E14| 46420|1666656001010080000|LHC.BCTFR.A6R4.B1...|{2022-10-25 02:00...|\n", "|3.363426610018308E14| 46424|1666656001010080000|LHC.BCTFR.A6R4.B2...|{2022-10-25 02:00...|\n", "+--------------------+----------------+-------------------+--------------------+--------------------+\n", "only showing top 3 rows\n", "\n" ] } ], "source": [ "# Show another method for creating time bins\n", "\n", "sdf.withColumn('time_bin', window(timestamp_seconds(col('nxcals_timestamp')/1e9), '10 minutes')).show(3)" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "# spark.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" }, "sparkconnect": { "bundled_options": [ "NXCALS_NEW", "NXCALS" ], "list_of_options": [] } }, "nbformat": 4, "nbformat_minor": 2 }