Source code for columnflow.production.cms.pileup

# coding: utf-8

"""
Column production methods related to pileup weights.
"""

import functools

import law

from columnflow.production import Producer, producer
from columnflow.util import maybe_import, DotDict
from columnflow.columnar_util import set_ak_column
from columnflow.types import Any

np = maybe_import("numpy")
ak = maybe_import("awkward")


# helper
set_ak_column_f32 = functools.partial(set_ak_column, value_type=np.float32)

logger = law.logger.get_logger(__name__)


[docs] @producer( uses={"Pileup.nTrueInt"}, produces={"pu_weight{,_minbias_xs_up,_minbias_xs_down}"}, # only run on mc mc_only=True, # function to determine the correction file get_pileup_file=(lambda self, external_files: external_files.pu_sf), ) def pu_weight(self: Producer, events: ak.Array, **kwargs) -> ak.Array: """ Based on the number of primary vertices, assigns each event pileup weights using correctionlib. """ # map the variable names from the corrector to our columns variable_map = { "NumTrueInteractions": events.Pileup.nTrueInt, } for column_name, syst in ( ("pu_weight", "nominal"), ("pu_weight_minbias_xs_up", "up"), ("pu_weight_minbias_xs_down", "down"), ): # get the inputs for this type of variation variable_map_syst = {**variable_map, "weights": syst} inputs = [variable_map_syst[inp.name] for inp in self.pileup_corrector.inputs] # evaluate and store the produced column pu_weight = self.pileup_corrector.evaluate(*inputs) events = set_ak_column(events, column_name, pu_weight, value_type=np.float32) return events
@pu_weight.requires def pu_weight_requires( self: Producer, task: law.Task, reqs: dict[str, DotDict[str, Any]], **kwargs, ) -> None: """ Adds the requirements needed the underlying task to derive the pileup weights into *reqs*. """ if "external_files" in reqs: return from columnflow.tasks.external import BundleExternalFiles reqs["external_files"] = BundleExternalFiles.req(task) @pu_weight.setup def pu_weight_setup( self: Producer, task: law.Task, reqs: dict[str, DotDict[str, Any]], inputs: dict[str, Any], reader_targets: law.util.InsertableDict, **kwargs, ) -> None: """ Loads the pileup calculator from the external files bundle and saves them in the py:attr:`pileup_corrector` attribute for simpler access in the actual callable. """ bundle = reqs["external_files"] # create the corrector import correctionlib correctionlib.highlevel.Correction.__call__ = correctionlib.highlevel.Correction.evaluate correction_set = correctionlib.CorrectionSet.from_string( self.get_pileup_file(bundle.files).load(formatter="gzip").decode("utf-8"), ) # check if len(correction_set.keys()) != 1: raise Exception("Expected exactly one type of pileup correction") corrector_name = list(correction_set.keys())[0] self.pileup_corrector = correction_set[corrector_name]
[docs] @producer( uses={"Pileup.nTrueInt"}, produces={"pu_weight{,_minbias_xs_up,_minbias_xs_down}"}, # only run on mc mc_only=True, ) def pu_weights_from_columnflow(self: Producer, events: ak.Array, **kwargs) -> ak.Array: """ Based on the number of primary vertices, assigns each event pileup weights using the profile of pileup ratios at the py:attr:`pu_weights` attribute provided by the requires and setup functions below. """ # compute the indices for looking up weights indices = events.Pileup.nTrueInt.to_numpy().astype("int32") - 1 max_bin = len(self.pu_weights) - 1 indices[indices > max_bin] = max_bin # save the weights events = set_ak_column_f32(events, "pu_weight", self.pu_weights.nominal[indices]) events = set_ak_column_f32(events, "pu_weight_minbias_xs_up", self.pu_weights.minbias_xs_up[indices]) events = set_ak_column_f32(events, "pu_weight_minbias_xs_down", self.pu_weights.minbias_xs_down[indices]) return events
@pu_weights_from_columnflow.requires def pu_weights_from_columnflow_requires( self: Producer, task: law.Task, reqs: dict[str, DotDict[str, Any]], **kwargs, ) -> None: """ Adds the requirements needed the underlying task to derive the pileup weights into *reqs*. """ if "pu_weights" in reqs: return from columnflow.tasks.cms.external import CreatePileupWeights reqs["pu_weights"] = CreatePileupWeights.req(task) @pu_weights_from_columnflow.setup def pu_weights_from_columnflow_setup( self: Producer, task: law.Task, reqs: dict[str, DotDict[str, Any]], inputs: dict[str, Any], reader_targets: law.util.InsertableDict, **kwargs, ) -> None: """ Loads the pileup weights added through the requirements and saves them in the py:attr:`pu_weights` attribute for simpler access in the actual callable. """ self.pu_weights = ak.zip(inputs["pu_weights"].load(formatter="json"))