{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\"EP-SFT\"\n", "\"Spark\"\n", "\n", "# Physics Analysis: Dimuon spectrum\n", "\n", "This tutorial shows you how to analyze datasets using `RDataFrame` from a Python notebook, offloading computations to a [Spark](https://spark.apache.org/) cluster. The example analysis performs the following steps:\n", "\n", "1. Connect a ROOT dataframe to a dataset containing 61 mio. events recorded by CMS in 2012\n", "2. Filter the events being relevant for your analysis\n", "3. Compute the invariant mass of the selected dimuon candidates\n", "4. Plot the invariant mass spectrum showing resonances up to the Z mass\n", "\n", "This material is based on the analysis done by Stefan Wunsch, available [here](http://opendata.web.cern.ch/record/12342) in CERN's Open Data portal.\n", "\n", "This notebook has been tested using the following configurations on CERN SWAN:\n", "\n", "* *Spark in local mode*: No cluster attached\n", "* *Spark cluster*: Cloud Containers (K8s)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prelude: Using a Distributed RDataFrame\n", "\n", "While ROOT is implemented in C++, it provides automatic Python bindings, called [PyROOT](https://root.cern/manual/python/). This means that all the ROOT C++ functionality is also available from Python, including `RDataFrame`." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Welcome to JupyROOT 6.26/08\n" ] } ], "source": [ "# This import gives access to all the ROOT C++ code from Python\n", "import ROOT" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In addition, there is a package called `PyRDF` - soon to be integrated in ROOT - that implements a layer on top of `RDataFrame`. `PyRDF` allows analyses written with `RDataFrame` to be executed on a set of distributed resources.\n", "\n", "\"Distributed" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this notebook, we are going to use the [Spark](https://spark.apache.org/) backend of `PyRDF` to submit `RDataFrame` computations to an external Spark cluster. Note that the connection to the cluster needs to be done prior to the execution of this notebook (click on the `?` button on the top bar for help on how to connect to Spark clusters from SWAN)." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import PyRDF\n", "\n", "# Select Spark backend, partition the dataset in 16 fragments when running distributedly\n", "PyRDF.use(\"spark\", {'npartitions': '16'})\n", "\n", "# Use RDataFrame through PyRDF\n", "from PyRDF import RDataFrame" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a ROOT dataframe\n", "\n", "The creation of the ROOT dataframe and the construction of the computation graph via `PyRDF` are done in the exact same way as with plain `RDataFrame` from Python. In practice, this means that the same code can be executed locally or distributedly just by switching the backend we would like to use.\n", "\n", "Let's see that in action. First we will create a ROOT dataframe that is connected to a dataset named `Events` stored in a ROOT file. The file is pulled in via [XRootD](http://xrootd.org/) from EOS public, but note how it could also be stored in your CERNBox space or in any other EOS repository accessible from SWAN (e.g. the experiment ones).\n", "\n", "The dataset `Events` is a `TTree` (a \"table\" in first order) and has the following branches (also referred to as \"columns\"):\n", "\n", "| Branch name | Data type | Description |\n", "|-------------|-----------|-------------|\n", "| `nMuon` | `unsigned int` | Number of muons in this event |\n", "| `Muon_pt` | `float[nMuon]` | Transverse momentum of the muons stored as an array of size `nMuon` |\n", "| `Muon_eta` | `float[nMuon]` | Pseudo-rapidity of the muons stored as an array of size `nMuon` |\n", "| `Muon_phi` | `float[nMuon]` | Azimuth of the muons stored as an array of size `nMuon` |\n", "| `Muon_charge` | `int[nMuon]` | Charge of the muons stored as an array of size `nMuon` and either -1 or 1 |\n", "| `Muon_mass` | `float[nMuon]` | Mass of the muons stored as an array of size `nMuon` |" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "df = RDataFrame(\"Events\", \"root://eospublic.cern.ch//eos/opendata/cms/derived-data/AOD2NanoAODOutreachTool/Run2012BC_DoubleMuParked_Muons.root\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run on the entire dataset\n", "\n", "The full dataset contains half a year of CMS data taking in 2012 with 61 mio events. That would be a bit long to process if we just used the few cores provided by the SWAN session itself. Luckily, we can offload our computations to a Spark cluster and get the result back for inspection in the notebook.\n", "\n", "Remember this rule of thumb:\n", "1. In the exploratory phase of your analysis, you can run your `RDataFrame` computation on a small subset of your input dataset. The resources of the SWAN session (a few cores and GBs of RAM) should be enough for that purpose. At this stage, `RDataFrame::Range` comes in handy to execute on less data.\n", "2. When you are ready to run on the entire dataset, you can use `RDataFrame` via `PyRDF` and select the Spark backend. Since the computations are offloaded to a Spark cluster, you won't be overloading your SWAN session." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "# No need to select a subset of the dataset, we've got distributed resources!\n", "# df_range = df.Range(1000000)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Filter relevant events for this analysis\n", "\n", "Physics datasets are often general purpose datasets and therefore need extensive filtering of the events for the actual analysis. Here, we implement only a simple selection based on the number of muons and the charge to cut down the dataset in events that are relevant for our study.\n", "\n", "In particular, we are applying two filters to keep:\n", "1. Events with exactly two muons\n", "2. Events with muons of opposite charge" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "df_2mu = df.Filter(\"nMuon == 2\", \"Events with exactly two muons\")\n", "df_oc = df_2mu.Filter(\"Muon_charge[0] != Muon_charge[1]\", \"Muons with opposite charge\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Compute the invariant mass of the dimuon system\n", "\n", "Since we want to see the resonances in the mass spectrum, where dimuon events are more likely, we need to compute the invariant mass from the four-vectors of the muon candidates. Because this operation is non-trivial, we will use ROOT's `ROOT::VecOps::InvariantMass` function to do the job for us.\n", "\n", "The invariant mass will be stored in a new column that we will create with the `Define` operation of `RDataFrame`. The parameters of `Define` are the name of the new column and a string with the function to be invoked, which includes the dataset columns to be used as arguments for the such function.\n", "\n", "Note how, even though `ROOT::VecOps::InvariantMass` is a C++ function, we can use it in our `RDataFrame` workflow from Python: the second parameter of `Define` is a string that contains C++ code. Such code will be just-in-time compiled by ROOT and called during the event loop in C++. This allows to benefit from C++ performance in computationally-expensive code even when programming in Python." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "df_mass = df_oc.Define(\"Dimuon_mass\", \"ROOT::VecOps::InvariantMass(Muon_pt, Muon_eta, Muon_phi, Muon_mass)\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Make a histogram of the dimuon spectrum\n", "\n", "As (almost) always in physics, we have a look at the results in the form of a histogram. Let's book a histogram as one endpoint of our computation graph." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "nbins = 30000\n", "low = 0.25\n", "up = 300\n", "h = df_mass.Histo1D((\"Dimuon_mass\", \"Dimuon_mass\", nbins, low, up), \"Dimuon_mass\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Plot the dimuon spectrum\n", "\n", "Now, the computation graph is set up. Next, we want to have a look at the result.\n", "\n", "Note that the event loop actually runs the first time we try to access the histogram object (results of an `RDataFrame` graph are computed lazily). Therefore, since we selected the Spark backend, the offloading of the computations happens in the cell below. You should see a monitoring display appearing in the output of the cell that tells you about the progress of the Spark job.\n", "\n", "`%%time` measures the time spend in the full cell. You can compare it with how long it takes to run locally over just 1 mio. events!" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 0:=====================================================> (15 + 1) / 16]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 555 ms, sys: 109 ms, total: 664 ms\n", "Wall time: 39.3 s\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\r", " \r" ] }, { "data": { "text/plain": [ "" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "\n", "ROOT.gStyle.SetOptStat(0); ROOT.gStyle.SetTextFont(42)\n", "c = ROOT.TCanvas(\"c\", \"\", 800, 700)\n", "c.SetLogx(); c.SetLogy()\n", "h.SetTitle(\"\")\n", "h.GetXaxis().SetTitle(\"m_{#mu#mu} (GeV)\"); h.GetXaxis().SetTitleSize(0.04)\n", "h.GetYaxis().SetTitle(\"N_{Events}\"); h.GetYaxis().SetTitleSize(0.04)\n", "h.Draw()\n", "\n", "label = ROOT.TLatex(); label.SetNDC(True)\n", "label.SetTextSize(0.040); label.DrawLatex(0.100, 0.920, \"#bf{CMS Open Data}\")\n", "label.SetTextSize(0.030); label.DrawLatex(0.500, 0.920, \"#sqrt{s} = 8 TeV, L_{int} = 11.6 fb^{-1}\");" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "ROOT provides interactive JavaScript graphics for Jupyter, which can be activated with the `%jsroot` magic. Click and drag on the axis to zoom in and double click to reset the view." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n", "
\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "%jsroot on\n", "c.Draw()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" }, "sparkconnect": { "bundled_options": [], "list_of_options": [ { "name": "spark.executor.instances", "value": "4" }, { "name": "spark.executor.cores", "value": "4" } ] } }, "nbformat": 4, "nbformat_minor": 2 }