Source code for columnflow.reduction.default
# coding: utf-8
"""
Reducer definition for achieving columnsflow's default reduction behavior in three steps:
- remove unwanted events (using "event" mask of selection results)
- create new collections (using "objects" mapping of selection results)
- only keep certain columns after the reduction
"""
from __future__ import annotations
from collections import defaultdict
import law
from columnflow.reduction import Reducer, reducer
from columnflow.reduction.util import create_event_mask, create_collections_from_masks
from columnflow.util import maybe_import
ak = maybe_import("awkward")
[docs]
@reducer()
def cf_default_keep_columns(self: Reducer, events: ak.Array, selection: ak.Array, **kwargs) -> ak.Array:
"""
Reducer that does nothing but to define the columns to keep after the reduction in a backwards-compatible way using
the "keep_columns" auxiliary config field as was the default in previous columnflow versions.
"""
return events
@cf_default_keep_columns.post_init
def cf_default_keep_columns_post_init(self: Reducer, task: law.Task, **kwargs) -> None:
super(cf_default_keep_columns, self).post_init_func(task=task, **kwargs)
for c in self.config_inst.x.keep_columns.get(task.task_family, ["*"]):
self.produces.update(task._expand_keep_column(c))
[docs]
@reducer(
# disable the check for used columns
check_used_columns=False,
# whether to add cf_default_keep_columns as a dependency to achieve backwards compatibility
add_keep_columns=True,
# whether to register the shifts of the upstream selector as shifts of this reducer
mirror_selector_shifts=True,
)
def cf_default(self: Reducer, events: ak.Array, selection: ak.Array, task: law.Task, **kwargs) -> ak.Array:
# build the event mask
event_mask = create_event_mask(selection, task.selector_steps)
# apply it
events = events[event_mask]
# add collections
if "objects" in selection.fields:
events = create_collections_from_masks(events, selection.objects[event_mask])
return events
@cf_default.init
def cf_default_init(self: Reducer, **kwargs) -> None:
super(cf_default, self).init_func(**kwargs)
if self.add_keep_columns:
self.uses.add(cf_default_keep_columns.PRODUCES)
self.produces.add(cf_default_keep_columns.PRODUCES)
# mirror selector shifts
if self.mirror_selector_shifts and "selector_shifts" in self.inst_dict:
self.shifts |= self.selector_shifts
@cf_default.post_init
def cf_default_post_init(self: Reducer, task: law.Task, **kwargs) -> None:
super(cf_default, self).post_init_func(task=task, **kwargs)
# the updates to used columns are only necessary if the task invokes the reducer
if not task.invokes_reducer:
return
# add used columns pointing to the selection steps
# (all starting with "steps." for ReduceEvents to decide to load them from selection result data)
for step in task.selector_steps:
self.uses.add(f"steps.{step}")
# based on the columns to write, determine which collections need to be read to produce new collections
# (masks must start with "objects." for ReduceEvents to decide to load them from selection result data)
output_collection_fields = defaultdict(set)
for route in self.produced_columns:
if len(route) > 1:
output_collection_fields[route[0]].add(route)
# iterate through collections and update used colums
for src_col, dst_cols in task.collection_map.items():
for dst_col in dst_cols:
# skip if the collection does not need to be loaded at all
if not law.util.multi_match(dst_col, output_collection_fields.keys()):
continue
# read the object mask
self.uses.add(f"objects.{src_col}.{dst_col}")
# make sure that the corresponding columns of the source collection are loaded
self.uses.update(src_col + route[1:] for route in output_collection_fields[dst_col])