# coding: utf-8
"""
Tools for producing new array columns (e.g. high-level variables).
"""
from __future__ import annotations
import copy
import inspect
import law
from columnflow.columnar_util import TaskArrayFunction
from columnflow.util import DerivableMeta, UNSET
from columnflow.types import Callable, Sequence, Any, UNSET_TYPE
[docs]
class TaskArrayFunctionWithProducerRequirements(TaskArrayFunction):
require_producers: Sequence[str] | set[str] | None = None
def __init__(self, *args, **kwargs):
if "require_producers" in kwargs or self.__class__.require_producers is None:
kwargs["require_producers"] = kwargs.get("require_producers") or []
elif isinstance(self.__class__.require_producers, (list, tuple)):
kwargs["require_producers"] = copy.copy(self.__class__.require_producers)
super().__init__(*args, **kwargs)
def _req_producer(self, task: law.Task, producer: str) -> Any:
# hook to customize how required producers are requested
from columnflow.tasks.production import ProduceColumns
return ProduceColumns.req_other_producer(task, producer=producer)
[docs]
def requires_func(self, task: law.Task, reqs: dict, **kwargs) -> None:
super().requires_func(task=task, reqs=reqs, **kwargs)
# no requirements for workflows in pilot mode
if callable(getattr(task, "is_workflow", None)) and task.is_workflow() and getattr(task, "pilot", False):
return
# add required producers when set
if (prods := self.require_producers):
reqs["required_producers"] = {
prod: self._req_producer(task, prod)
for prod in law.util.make_unique(prods)
}
[docs]
def setup_func(
self,
task: law.Task,
reqs: dict,
inputs: dict,
reader_targets: law.util.InsertableDict,
**kwargs,
) -> None:
super().setup_func(task=task, reqs=reqs, inputs=inputs, reader_targets=reader_targets, **kwargs)
if "required_producers" in inputs:
for prod, inp in inputs["required_producers"].items():
reader_targets[f"required_producer_{prod}"] = inp["columns"]
[docs]
class Producer(TaskArrayFunctionWithProducerRequirements):
"""
Base class for all producers.
"""
exposed = True
# register attributes for arguments accepted by decorator
mc_only: bool = False
data_only: bool = False
[docs]
@classmethod
def producer(
cls,
func: Callable | None = None,
bases: tuple = (),
mc_only: bool | UNSET_TYPE = UNSET,
data_only: bool | UNSET_TYPE = UNSET,
require_producers: Sequence[str] | set[str] | None | UNSET_TYPE = UNSET,
**kwargs,
) -> DerivableMeta | Callable:
"""
Decorator for creating a new :py:class:`Producer` subclass with additional, optional *bases*
and attaching the decorated function to it as :py:meth:`~Producer.call_func`.
When *mc_only* (*data_only*) is *True*, the producer is skipped and not considered by
other calibrators, selectors and producers in case they are evaluated on a
:py:class:`order.Dataset` (using the :py:attr:`dataset_inst` attribute) whose ``is_mc``
(``is_data``) attribute is *False*.
All additional *kwargs* are added as class members of the new subclasses.
:param func: Function to be wrapped and integrated into new :py:class:`Producer` class.
:param bases: Additional bases for the new :py:class:`Producer`.
:param mc_only: Boolean flag indicating that this :py:class:`Producer` should only run on
Monte Carlo simulation and skipped for real data.
:param data_only: Boolean flag indicating that this :py:class:`Producer` should only run on
real data and skipped for Monte Carlo simulation.
:param require_producers: Sequence of names of other producers to add to the requirements.
:return: New :py:class:`Producer` subclass.
"""
def decorator(func: Callable) -> DerivableMeta:
# create the class dict
cls_dict = {**kwargs, "call_func": func}
if mc_only is not UNSET:
cls_dict["mc_only"] = mc_only
if data_only is not UNSET:
cls_dict["data_only"] = data_only
if require_producers is not UNSET:
cls_dict["require_producers"] = require_producers
# get the module name
frame = inspect.stack()[1]
module = inspect.getmodule(frame[0])
# get the producer name
cls_name = cls_dict.pop("cls_name", func.__name__)
# hook to update the class dict during class derivation
def update_cls_dict(cls_name, cls_dict, get_attr):
mc_only = get_attr("mc_only")
data_only = get_attr("data_only")
# optionally add skip function
if mc_only and data_only:
raise Exception(f"producer {cls_name} received both mc_only and data_only")
if (mc_only or data_only) and cls_dict.get("skip_func"):
raise Exception(
f"producer {cls_name} received custom skip_func, but either mc_only or data_only are set",
)
if "skip_func" not in cls_dict:
def skip_func(self, **kwargs) -> bool:
# check mc_only and data_only
if mc_only and not self.dataset_inst.is_mc:
return True
if data_only and not self.dataset_inst.is_data:
return True
# in all other cases, do not skip
return False
cls_dict["skip_func"] = skip_func
return cls_dict
cls_dict["update_cls_dict"] = update_cls_dict
# create the subclass
subclass = cls.derive(cls_name, bases=bases, cls_dict=cls_dict, module=module)
return subclass
return decorator(func) if func else decorator
# shorthand
producer = Producer.producer