# coding: utf-8
"""
Electron related event weights.
"""
from __future__ import annotations
import dataclasses
import law
from columnflow.production import Producer, producer
from columnflow.util import maybe_import, load_correction_set, DotDict
from columnflow.columnar_util import set_ak_column, full_like, flat_np_view, layout_ak_array
from columnflow.types import Any, Callable
np = maybe_import("numpy")
ak = maybe_import("awkward")
[docs]
@dataclasses.dataclass
class ElectronSFConfig:
correction: str
campaign: str
working_point: str | dict[str, Callable] = ""
hlt_path: str = ""
min_pt: float = 0.0
max_pt: float = 0.0
def __post_init__(self) -> None:
if not self.working_point and not self.hlt_path:
raise ValueError("either working_point or hlt_path must be set")
if self.working_point and self.hlt_path:
raise ValueError("only one of working_point or hlt_path must be set")
if 0.0 < self.max_pt <= self.min_pt:
raise ValueError(f"{self.__class__.__name__}: max_pt must be larger than min_pt")
[docs]
@classmethod
def new(cls, obj: ElectronSFConfig | tuple[str, str, str]) -> ElectronSFConfig:
# purely for backwards compatibility with the old tuple format
if isinstance(obj, cls):
return obj
if isinstance(obj, list) or isinstance(obj, tuple) or isinstance(obj, set):
return cls(*obj)
if isinstance(obj, dict):
return cls(**obj)
raise ValueError(f"cannot convert {obj} to ElectronSFConfig")
[docs]
@producer(
uses={"Electron.{pt,eta,phi,deltaEtaSC}"},
# produces in the init
# only run on mc
mc_only=True,
# function to determine the correction file
get_electron_file=(lambda self, external_files: external_files.electron_sf),
# function to determine the electron weight config
get_electron_config=(lambda self: ElectronSFConfig.new(self.config_inst.x("electron_sf", self.config_inst.x("electron_sf_names", None)))), # noqa: E501
# choose if the eta variable should be the electron eta or the super cluster eta
use_supercluster_eta=True,
# name of the saved weight column
weight_name="electron_weight",
supported_versions={1, 2, 3, 4, 5},
)
def electron_weights(
self: Producer,
events: ak.Array,
electron_mask: ak.Array | type(Ellipsis) = Ellipsis,
**kwargs,
) -> ak.Array:
"""
Creates electron weights using the correctionlib. Requires an external file in the config under ``electron_sf``:
.. code-block:: python
cfg.x.external_files = DotDict.wrap({
"electron_sf": "/afs/cern.ch/work/m/mrieger/public/mirrors/jsonpog-integration-9ea86c4c/POG/EGM/2017_UL/electron.json.gz", # noqa
})
*get_electron_file* can be adapted in a subclass in case it is stored differently in the external files.
The name of the correction set, the year string for the weight evaluation, and the name of the working point should
be given as an auxiliary entry in the config:
.. code-block:: python
cfg.x.electron_sf = ElectronSFConfig(
correction="UL-Electron-ID-SF",
campaign="2017",
working_point="wp80iso", # for trigger weights use hlt_path instead
)
The *working_point* can also be a dictionary mapping working point names to functions that return a boolean mask for
the electrons. This is useful to compute scale factors for multiple working points at once, e.g. for the electron
reconstruction scale factors:
.. code-block:: python
cfg.x.electron_sf = ElectronSFConfig(
correction="Electron-ID-SF",
campaign="2022Re-recoE+PromptFG",
working_point={
"RecoBelow20": lambda variable_map: variable_map["pt"] < 20.0,
"Reco20to75": lambda variable_map: (variable_map["pt"] >= 20.0) & (variable_map["pt"] < 75.0),
"RecoAbove75": lambda variable_map: variable_map["pt"] >= 75.0,
},
)
*get_electron_config* can be adapted in a subclass in case it is stored differently in the config.
Optionally, an *electron_mask* can be supplied to compute the scale factor weight based only on a subset of
electrons.
"""
# fold electron mask with pt cuts if given
if self.electron_config.min_pt > 0.0:
pt_mask = events.Electron.pt >= self.electron_config.min_pt
electron_mask = pt_mask if electron_mask is Ellipsis else (pt_mask & electron_mask)
if self.electron_config.max_pt > 0.0:
pt_mask = events.Electron.pt <= self.electron_config.max_pt
electron_mask = pt_mask if electron_mask is Ellipsis else (pt_mask & electron_mask)
# prepare input variables
electrons = events.Electron[electron_mask]
eta = electrons.eta
if self.use_supercluster_eta:
eta = (
electrons.superclusterEta
if "superclusterEta" in electrons.fields
else electrons.eta + electrons.deltaEtaSC
)
variable_map = {
"year": self.electron_config.campaign,
"Path": self.electron_config.hlt_path,
"pt": electrons.pt,
"phi": electrons.phi,
"eta": eta,
}
# loop over systematics
for syst, postfix in zip(self.sf_variations, ["", "_up", "_down"]):
# get the inputs for this type of variation
variable_map_syst = variable_map | {"ValType": syst}
# add working point
wp = self.electron_config.working_point
if isinstance(wp, str):
# single wp, just evaluate
variable_map_syst_wp = variable_map_syst | {"WorkingPoint": wp}
inputs = [variable_map_syst_wp[inp.name] for inp in self.electron_sf_corrector.inputs]
sf = self.electron_sf_corrector.evaluate(*inputs)
elif isinstance(wp, dict):
# mapping of wps to masks, evaluate per wp and combine
sf_flat = flat_np_view(full_like(eta, 1.0))
for _wp, mask_fn in wp.items():
mask = mask_fn(variable_map)
variable_map_syst_wp = variable_map_syst | {"WorkingPoint": _wp}
# call the corrector with the masked inputs
inputs = [
(
variable_map_syst_wp[inp.name][mask]
if isinstance(variable_map_syst_wp[inp.name], (np.ndarray, ak.Array))
else variable_map_syst_wp[inp.name]
)
for inp in self.electron_sf_corrector.inputs
]
sf_flat[flat_np_view(mask)] = flat_np_view(self.electron_sf_corrector.evaluate(*inputs))
sf = layout_ak_array(sf_flat, eta)
else:
raise ValueError(f"unsupported working point type {type(variable_map['WorkingPoint'])}")
# create the product over all electrons in one event
weight = ak.prod(sf, axis=1, mask_identity=False)
# store it
events = set_ak_column(events, f"{self.weight_name}{postfix}", weight, value_type=np.float32)
return events
@electron_weights.init
def electron_weights_init(self: Producer, **kwargs) -> None:
super(electron_weights, self).init_func(**kwargs)
# add the product of nominal and up/down variations to produced columns
self.produces.add(f"{self.weight_name}{{,_up,_down}}")
@electron_weights.requires
def electron_weights_requires(
self: Producer,
task: law.Task,
reqs: dict[str, DotDict[str, Any]],
**kwargs,
) -> None:
super(electron_weights, self).requires_func(task=task, reqs=reqs, **kwargs)
if "external_files" in reqs:
return
from columnflow.tasks.external import BundleExternalFiles
reqs["external_files"] = BundleExternalFiles.req(task)
@electron_weights.setup
def electron_weights_setup(
self: Producer,
task: law.Task,
reqs: dict[str, DotDict[str, Any]],
inputs: dict[str, Any],
reader_targets: law.util.InsertableDict,
**kwargs,
) -> None:
super(electron_weights, self).setup_func(
task=task,
reqs=reqs,
inputs=inputs,
reader_targets=reader_targets,
**kwargs,
)
self.electron_config = self.get_electron_config()
# load the corrector
e_file = self.get_electron_file(reqs["external_files"].files)
self.electron_sf_corrector = load_correction_set(e_file)[self.electron_config.correction]
# the ValType key accepts different arguments for efficiencies and scale factors
if self.electron_config.correction.endswith("Eff"):
self.sf_variations = ["nom", "up", "down"]
else:
self.sf_variations = ["sf", "sfup", "sfdown"]
# check versions
if self.supported_versions and self.electron_sf_corrector.version not in self.supported_versions:
raise Exception(f"unsupported electron sf corrector version {self.electron_sf_corrector.version}")
# custom electron weight that runs trigger SFs
electron_trigger_weights = electron_weights.derive(
"electron_trigger_weights",
cls_dict={
"get_electron_file": (lambda self, external_files: external_files.electron_trigger_sf),
"get_electron_config": (lambda self: self.config_inst.x.electron_trigger_sf_names),
"use_supercluster_eta": False,
"weight_name": "electron_trigger_weight",
},
)
[docs]
@producer(
uses={"Electron.{pt,phi,eta,deltaEtaSC}"},
produces={"Electron.superclusterEta"},
)
def electron_sceta(self, events: ak.Array, **kwargs) -> ak.Array:
"""
Returns the electron super cluster eta.
"""
sc_eta = events.Electron.eta + events.Electron.deltaEtaSC
events = set_ak_column(events, "Electron.superclusterEta", sc_eta, value_type=np.float32)
return events