Skip to content

Commit

Permalink
feat: serve ROOT file URIs with ServiceX for CMS ttbar notebook (#103)
Browse files Browse the repository at this point in the history
* updated ServiceX option
* get rid of unnecessary imports
* changed pipeline image
* added use_servicex to metrics tracking
* queries are titled

---------

Co-authored-by: ekauffma <elmaka8700@gmail.com>
  • Loading branch information
ekauffma and ekauffma authored Feb 22, 2023
1 parent 8149680 commit e9d545e
Showing 4 changed files with 218 additions and 370 deletions.
295 changes: 123 additions & 172 deletions analyses/cms-open-data-ttbar/ttbar_analysis_pipeline.ipynb

Large diffs are not rendered by default.

233 changes: 94 additions & 139 deletions analyses/cms-open-data-ttbar/ttbar_analysis_pipeline.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
# extension: .py
# format_name: percent
# format_version: '1.3'
# jupytext_version: 1.14.4
# jupytext_version: 1.14.1
# kernelspec:
# display_name: Python 3 (ipykernel)
# language: python
@@ -34,7 +34,7 @@
# %% [markdown]
# ### Data pipelines
#
# To be a bit more precise, we are going to be looking at three different data pipelines:
# There are two possible pipelines: one with `ServiceX` enabled, and one using only `coffea` for processing.
# ![processing pipelines](utils/processing_pipelines.png)

# %% [markdown]
@@ -46,16 +46,15 @@
import os
import time

import vector; vector.register_awkward()

import awkward as ak
import cabinetry
from coffea import processor
from coffea.processor import servicex
from coffea.nanoevents import transforms
from coffea.nanoevents.methods import base, vector
from coffea.nanoevents.schemas.base import BaseSchema, zip_forms
from servicex import ServiceXDataset
from func_adl import ObjectStream
from func_adl_servicex import ServiceXSourceUpROOT
import hist
import json
import matplotlib.pyplot as plt
@@ -88,47 +87,26 @@
# | `-1` | 2269 | 3.59 TB |
#
# The input files are all in the 1–2 GB range.
#
# Some files are also rucio-accessible (with ATLAS credentials):
#
# | dataset | number of files | total size |
# | --- | --- | --- |
# | `user.ivukotic:user.ivukotic.ttbar__nominal` | 7066 | 1.46 TB |
# | `user.ivukotic:user.ivukotic.ttbar__scaledown` | 902 | 209 GB |
# | `user.ivukotic:user.ivukotic.ttbar__scaleup` | 917 | 191 GB |
# | `user.ivukotic:user.ivukotic.ttbar__ME_var` | 438 | 103 GB |
# | `user.ivukotic:user.ivukotic.ttbar__PS_var` | 443 | 100 GB |
# | `user.ivukotic:user.ivukotic.single_top_s_chan__nominal` | 114 | 11 GB |
# | `user.ivukotic:user.ivukotic.single_top_t_chan__nominal` | 2506 | 392 GB |
# | `user.ivukotic:user.ivukotic.single_top_tW__nominal` | 50 | 9 GB |
# | `user.ivukotic:user.ivukotic.wjets__nominal` | 10199 | 1.13 TB |
# | total | 22635 | 3.61 TB |
#
# The difference in total file size is presumably due to the different storages, which report slightly different sizes.
#
# When setting the `PIPELINE` variable below to `"servicex_databinder"`, the `N_FILES_MAX_PER_SAMPLE` variable is ignored and all files are processed.

# %%
### GLOBAL CONFIGURATION

# input files per process, set to e.g. 10 (smaller number = faster)
N_FILES_MAX_PER_SAMPLE = 5

# pipeline to use:
# - "coffea" for pure coffea setup
# - "servicex_processor" for coffea with ServiceX processor
# - "servicex_databinder" for downloading query output and subsequent standalone coffea
PIPELINE = "coffea"

# enable Dask (may not work yet in combination with ServiceX outside of coffea-casa)
# enable Dask
USE_DASK = True

# ServiceX behavior: ignore cache with repeated queries
# enable ServiceX
USE_SERVICEX = False

# ServiceX: ignore cache with repeated queries
SERVICEX_IGNORE_CACHE = False

# analysis facility: set to "coffea_casa" for coffea-casa environments, "EAF" for FNAL, "local" for local setups
AF = "coffea_casa"


### BENCHMARKING-SPECIFIC SETTINGS

# chunk size to use
@@ -149,6 +127,7 @@
# acceptable values are 4, 15, 25, 50 (corresponding to % of file read), 4% corresponds to the standard branches used in the notebook
IO_FILE_PERCENT = 4


# %% [markdown]
# ### Defining our `coffea` Processor
#
@@ -159,8 +138,6 @@
# - filling all the information into histograms that get aggregated and ultimately returned to us by `coffea`.

# %% tags=[]
processor_base = processor.ProcessorABC if (PIPELINE != "servicex_processor") else servicex.Analysis

# functions creating systematic variations
def flat_variation(ones):
# 2.5% weight variations
@@ -180,7 +157,7 @@ def jet_pt_resolution(pt):
return ak.unflatten(resolution_variation, counts)


class TtbarAnalysis(processor_base):
class TtbarAnalysis(processor.ProcessorABC):
def __init__(self, disable_processing, io_file_percent):
num_bins = 25
bin_low = 50
@@ -412,7 +389,7 @@ def behavior(self):
#
# Define the func_adl query to be used for the purpose of extracting columns and filtering.

# %% tags=[]
# %%
def get_query(source: ObjectStream) -> ObjectStream:
"""Query for event / column selection: >=4j >=1b, ==1 lep with pT>25 GeV, return relevant columns
"""
@@ -448,121 +425,101 @@ def get_query(source: ObjectStream) -> ObjectStream:


# %% [markdown]
# ### Standalone ServiceX for subsequent `coffea` processing
# ### Caching the queried datasets with `ServiceX`
#
# Using `servicex-databinder`, we can execute a query and download the output.
# As the files are currently accessible through `rucio` only with ATLAS credentials, you need to use an ATLAS ServiceX instance to run this (for example via the UChicago coffea-casa analysis facility).
# Using the queries created with `func_adl`, we are using `ServiceX` to read the CMS Open Data files to build cached files with only the specific event information as dictated by the query.

# %%
if USE_SERVICEX:

# dummy dataset on which to generate the query
dummy_ds = ServiceXSourceUpROOT("cernopendata://dummy", "events", backend_name="uproot")

# tell low-level infrastructure not to contact ServiceX yet, only to
# return the qastle string it would have sent
dummy_ds.return_qastle = True

# create the query
query = get_query(dummy_ds).value()

# now we query the files and create a fileset dictionary containing the
# URLs pointing to the queried files

# %% tags=[]
if PIPELINE == "servicex_databinder":
from servicex_databinder import DataBinder
t0 = time.time()
for process in fileset.keys():
ds = ServiceXDataset(fileset[process]['files'],
backend_name="uproot",
ignore_cache=SERVICEX_IGNORE_CACHE)
files = ds.get_data_rootfiles_uri(query,
as_signed_url=True,
title=process)

import inspect

query_string = inspect.getsource(get_query).split("return source.")[-1] # extract query from function defined previously

sample_names = ["ttbar__nominal", "ttbar__scaledown", "ttbar__scaleup", "ttbar__ME_var", "ttbar__PS_var",
"single_top_s_chan__nominal", "single_top_t_chan__nominal", "single_top_tW__nominal", "wjets__nominal"]
sample_names = ["single_top_s_chan__nominal"] # for quick tests: small dataset with only 50 files
sample_list = []

for sample_name in sample_names:
sample_list.append({"Name": sample_name, "RucioDID": f"user.ivukotic:user.ivukotic.{sample_name}", "Tree": "events", "FuncADL": query_string})


databinder_config = {
"General": {
"ServiceXBackendName": "uproot",
"OutputDirectory": "outputs_databinder",
"OutputFormat": "root",
"IgnoreServiceXCache": SERVICEX_IGNORE_CACHE
},
"Sample": sample_list
}

sx_db = DataBinder(databinder_config)
# out = sx_db.deliver(timer=True)
parquet_paths = sx_db._sx.get_servicex_data() # only run transform, do not download as well
print(f"execution took {time.time() - t0:.2f} seconds")

# point to ROOT files from databinder
# update list of fileset files, pointing to ServiceX output for subsequent processing
# for process in fileset.keys():
# if out.get(process):
# fileset[process]["files"] = out[process]

# point directly to parquet files from databinder
# update paths to point to ServiceX outputs
for sample_name, sample_paths in zip([sample['Name'] for sample in databinder_config['Sample']], parquet_paths):
print(f"updating paths for {sample_name} with {len(sample_paths)} parquet files (e.g. {sample_paths[0]}")
fileset[sample_name]["files"] = sample_paths

fileset[process]["files"] = [f.url for f in files]

print(f"ServiceX data delivery took {time.time() - t0:.2f} seconds")

# %% [markdown]
# ### Execute the data delivery pipeline
#
# What happens here depends on the configuration setting for `PIPELINE`:
# - when set to `servicex_processor`, ServiceX will feed columns to `coffea` processors, which will asynchronously process them and accumulate the output histograms,
# - when set to `coffea`, processing will happen with pure `coffea`,
# - if `PIPELINE` was set to `servicex_databinder`, the input data has already been pre-processed and will be processed further with `coffea`.
# What happens here depends on the flag `USE_SERVICEX`. If set to true, the processor is run on the data previously gathered by ServiceX, then will gather output histograms.
#
# When `USE_SERVICEX` is false, the input files need to be processed during this step as well.

# %%
if PIPELINE == "coffea":
if USE_DASK:
executor = processor.DaskExecutor(client=utils.get_client(AF))
else:
executor = processor.FuturesExecutor(workers=NUM_CORES)

run = processor.Runner(executor=executor, schema=AGCSchema, savemetrics=True, metadata_cache={}, chunksize=CHUNKSIZE)

filemeta = run.preprocess(fileset, treename="events") # pre-processing

t0 = time.monotonic()
all_histograms, metrics = run(fileset, "events", processor_instance=TtbarAnalysis(DISABLE_PROCESSING, IO_FILE_PERCENT)) # processing
exec_time = time.monotonic() - t0
all_histograms = all_histograms["hist"]

elif PIPELINE == "servicex_processor":
# in a notebook:
t0 = time.monotonic()
all_histograms = await utils.produce_all_histograms(fileset, get_query, TtbarAnalysis(DISABLE_PROCESSING, IO_FILE_PERCENT),
use_dask=USE_DASK, ignore_cache=SERVICEX_IGNORE_CACHE, schema=AGCSchema)
exec_time = time.monotonic() - t0

# as a script:
# async def produce_all_the_histograms():
# return await utils.produce_all_histograms(fileset, get_query, TtbarAnalysis(DISABLE_PROCESSING, IO_FILE_PERCENT),
# use_dask=USE_DASK, ignore_cache=SERVICEX_IGNORE_CACHE, schema=AGCSchema)
#
# all_histograms = asyncio.run(produce_all_the_histograms())

elif PIPELINE == "servicex_databinder":
# needs a slightly different schema, not currently implemented
raise NotImplementedError("further processing of this method is not currently implemented")
if USE_DASK:
executor = processor.DaskExecutor(client=utils.get_client(AF))
else:
executor = processor.FuturesExecutor(workers=NUM_CORES)

run = processor.Runner(executor=executor, schema=AGCSchema, savemetrics=True, metadata_cache={}, chunksize=CHUNKSIZE)

if USE_SERVICEX:
treename = "servicex"

else:
treename = "events"

filemeta = run.preprocess(fileset, treename=treename) # pre-processing

t0 = time.monotonic()
all_histograms, metrics = run(fileset, treename, processor_instance=TtbarAnalysis(DISABLE_PROCESSING, IO_FILE_PERCENT)) # processing
exec_time = time.monotonic() - t0

all_histograms = all_histograms["hist"]

print(f"\nexecution took {exec_time:.2f} seconds")

# %%
# track metrics for pure coffea setups
if PIPELINE == "coffea":
# update metrics
dataset_source = "/data" if fileset["ttbar__nominal"]["files"][0].startswith("/data") else "https://xrootd-local.unl.edu:1094" # TODO: xcache support
metrics.update({"walltime": exec_time, "num_workers": NUM_CORES, "af": AF_NAME, "dataset_source": dataset_source, "use_dask": USE_DASK,
"systematics": SYSTEMATICS, "n_files_max_per_sample": N_FILES_MAX_PER_SAMPLE, "pipeline": PIPELINE,
"cores_per_worker": CORES_PER_WORKER, "chunksize": CHUNKSIZE, "disable_processing": DISABLE_PROCESSING, "io_file_percent": IO_FILE_PERCENT})

# save metrics to disk
if not os.path.exists("metrics"):
os.makedirs("metrics")
timestamp = time.strftime('%Y%m%d-%H%M%S')
metric_file_name = f"metrics/{AF_NAME}-{timestamp}.json"
with open(metric_file_name, "w") as f:
f.write(json.dumps(metrics))

print(f"metrics saved as {metric_file_name}")
#print(f"event rate per worker (full execution time divided by NUM_CORES={NUM_CORES}): {metrics['entries'] / NUM_CORES / exec_time / 1_000:.2f} kHz")
print(f"event rate per worker (pure processtime): {metrics['entries'] / metrics['processtime'] / 1_000:.2f} kHz")
print(f"amount of data read: {metrics['bytesread']/1000**2:.2f} MB") # likely buggy: https://github.com/CoffeaTeam/coffea/issues/717
# track metrics
dataset_source = "/data" if fileset["ttbar__nominal"]["files"][0].startswith("/data") else "https://xrootd-local.unl.edu:1094" # TODO: xcache support
metrics.update({
"walltime": exec_time,
"num_workers": NUM_CORES,
"af": AF_NAME,
"dataset_source": dataset_source,
"use_dask": USE_DASK,
"use_servicex": USE_SERVICEX,
"systematics": SYSTEMATICS,
"n_files_max_per_sample": N_FILES_MAX_PER_SAMPLE,
"cores_per_worker": CORES_PER_WORKER,
"chunksize": CHUNKSIZE,
"disable_processing": DISABLE_PROCESSING,
"io_file_percent": IO_FILE_PERCENT
})

# save metrics to disk
if not os.path.exists("metrics"):
os.makedirs("metrics")
timestamp = time.strftime('%Y%m%d-%H%M%S')
metric_file_name = f"metrics/{AF_NAME}-{timestamp}.json"
with open(metric_file_name, "w") as f:
f.write(json.dumps(metrics))

print(f"metrics saved as {metric_file_name}")
#print(f"event rate per worker (full execution time divided by NUM_CORES={NUM_CORES}): {metrics['entries'] / NUM_CORES / exec_time / 1_000:.2f} kHz")
print(f"event rate per worker (pure processtime): {metrics['entries'] / metrics['processtime'] / 1_000:.2f} kHz")
print(f"amount of data read: {metrics['bytesread']/1000**2:.2f} MB") # likely buggy: https://github.com/CoffeaTeam/coffea/issues/717

# %% [markdown]
# ### Inspecting the produced histograms
@@ -693,5 +650,3 @@ def get_query(source: ObjectStream) -> ObjectStream:
# Please do not hesitate to get in touch if you would like to join the effort, or are interested in re-implementing (pieces of) the pipeline with different tools!
#
# Our mailing list is analysis-grand-challenge@iris-hep.org, sign up via the [Google group](https://groups.google.com/a/iris-hep.org/g/analysis-grand-challenge).

# %%
Loading

0 comments on commit e9d545e

Please sign in to comment.