Source code for columnflow.selection.empty

# coding: utf-8

"""
Empty selectors that still produce the minimal set of columns potentially required in downstream
tasks.
"""

from collections import defaultdict

from columnflow.selection import Selector, SelectionResult, selector
from columnflow.selection.stats import increment_stats
from columnflow.production.processes import process_ids
from columnflow.production.cms.mc_weight import mc_weight
from columnflow.columnar_util import set_ak_column
from columnflow.util import maybe_import

np = maybe_import("numpy")
ak = maybe_import("awkward")


[docs] @selector( uses={ process_ids, mc_weight, increment_stats, }, produces={ process_ids, mc_weight, "category_ids", }, exposed=True, # hard-coded category ids category_ids=None, # name of the inclusive category for looking it up inclusive_category_name="incl", ) def empty( self: Selector, events: ak.Array, stats: defaultdict, **kwargs, ) -> tuple[ak.Array, SelectionResult]: """ Empty selector that only writes a minimal set of columns that are potentially required in downstream tasks, such as cutflow and plotting related tasks. :param events: The input events. :param stats: The statistics dictionary. :param **kwargs: Additional keyword arguments that are passed to all other :py:class:`TaskArrayFunction`'s. :returns: A tuple containing the original events and a :py:class:`SelectionResult` object with a trivial event mask. """ # create process ids events = self[process_ids](events, **kwargs) # add corrected mc weights if self.dataset_inst.is_mc: events = self[mc_weight](events, **kwargs) # category id category_ids = np.array(len(events) * [self.category_ids], dtype=np.int64) events = set_ak_column(events, "category_ids", category_ids) # empty selection result with a trivial event mask results = SelectionResult(event=ak.Array(np.ones(len(events), dtype=bool))) # increment stats weight_map = { "num_events": Ellipsis, "num_events_selected": Ellipsis, } if self.dataset_inst.is_mc: weight_map["sum_mc_weight"] = events.mc_weight weight_map["sum_mc_weight_selected"] = (events.mc_weight, Ellipsis) group_map = { # per process "process": { "values": events.process_id, "mask_fn": (lambda v: events.process_id == v), }, } events, _ = self[increment_stats]( events, results, stats, weight_map=weight_map, group_map=group_map, **kwargs, ) return events, results
@empty.init def empty_init(self: Selector, **kwargs) -> None: """ Initializes the selector by finding the id of the inclusive category if no hard-coded category ids are given on class-level. :raises ValueError: If the inclusive category cannot be found. """ super(empty, self).init_func(**kwargs) # do nothing when category ids are set if self.category_ids is not None: return # find the id of the inclusive category if self.inclusive_category_name in self.config_inst.categories: self.category_ids = [self.config_inst.categories.get(self.inclusive_category_name).id] elif 1 in self.config_inst.categories: self.category_ids = [1] else: raise ValueError(f"could not find inclusive category for {self.cls_name} selector")