Source code for columnflow.tasks.framework.decorators

"""
Custom law task method decorators.
"""

import law

from columnflow import env_is_local
from columnflow.types import Any, Callable


[docs] @law.decorator.factory(accept_generator=True) def only_local_env( fn: Callable, opts: Any, task: law.Task, *args: Any, **kwargs: Any, ) -> tuple[Callable, Callable, Callable]: """ only_local_env() A decorator that ensures that the task's decorated method is only executed in the local environment, and not by (e.g.) remote jobs. :param fn: The decorated function. :param opts: Options for the decorator. :param task: The task instance. :param args: Arguments to be passed to the function call. :param kwargs: Keyword arguments to be passed to the function call. :return: A tuple containing the before_call, call, and after_call functions. """ def before_call() -> None: return None def call(state: Any) -> Any: if not env_is_local: raise RuntimeError(f"{task.task_family}.{fn.__name__}() can only be executed locally") return fn(task, *args, **kwargs) def after_call(state: Any) -> None: return None return before_call, call, after_call
[docs] @law.decorator.factory(callback=None, accept_generator=True) def on_failure( fn: Callable, opts: Any, task: law.Task, *args: Any, **kwargs: Any, ) -> tuple[Callable, Callable, Callable]: """ callback(callback=None) A decorator that is configured with a callback function that is invoked if the decorated method raises an exception. The task instances is passed to the callback function as an argument. :param fn: The decorated function. :param opts: Options for the decorator. :param task: The task instance. :param args: Arguments to be passed to the function call. :param kwargs: Keyword arguments to be passed to the function call. :return: A tuple containing the before_call, call, and after_call functions. """ def before_call() -> None: return None def call(state: Any) -> Any: try: return fn(task, *args, **kwargs) except: if callable(opts["callback"]): try: opts["callback"](task) except Exception as e: task.logger.error(f"failure callback raised an exception: {e}") raise def after_call(state: Any) -> None: return None return before_call, call, after_call
[docs] @law.decorator.factory(accept_generator=True) def view_output_plots( fn: Callable, opts: Any, task: law.Task, *args: Any, **kwargs: Any, ) -> tuple[Callable, Callable, Callable]: """ view_output_plots() This decorator is used to view the output plots of a task. It checks if the task has a view command, collects all the paths of the output files, and then opens each file using the view command. :param fn: The decorated function. :param opts: Options for the decorator. :param task: The task instance. :param args: Arguments to be passed to the function call. :param kwargs: Keyword arguments to be passed to the function call. :return: A tuple containing the before_call, call, and after_call functions. """ def before_call() -> None: return None def call(state: Any) -> Any: return fn(task, *args, **kwargs) def after_call(state: Any) -> None: view_cmd = getattr(task, "view_cmd", None) if not view_cmd or view_cmd == law.NO_STR: return # prepare the view command if "{}" not in view_cmd: view_cmd += " {}" # collect all paths to view seen: set[str] = set() outputs: list[Any] = law.util.flatten(task.output()) while outputs: output = outputs.pop(0) if isinstance(output, law.TargetCollection): outputs.extend(output._flat_target_list) continue if not getattr(output, "abspath", None): continue path = output.abspath if not path.endswith((".pdf", ".png")): continue if path in seen: continue seen.add(path) task.publish_message(f"showing {path}") with output.localize("r") as tmp: law.util.interruptable_popen( view_cmd.format(tmp.abspath), shell=True, executable="/bin/bash", ) return before_call, call, after_call