columnflow.ml#

Definition of basic objects for describing and creating ML models.

Classes:

MLModel(analysis_inst, *[, parameters])

Minimal interface to ML models with connections to config objects (such as py:class:order.Config or a order.Dataset) and, on an optional basis, to tasks.

class MLModel(analysis_inst, *, parameters=None, **kwargs)[source]#

Bases: Derivable

Minimal interface to ML models with connections to config objects (such as py:class:order.Config or a order.Dataset) and, on an optional basis, to tasks.

Inheriting classes need to overwrite eight methods:

See their documentation below for more info.

There are several optional hooks that allow for a custom setup after config objects were assigned (setup()), a fine-grained configuration of additional training requirements (requires()), diverging training and evaluation phase spaces (training_configs(), training_calibrators(), training_selector(), training_producers()), or how hyper-paramaters are string encoded for output declarations (parameter_pairs()). The optional py:meth:preparation_producer allows setting a producer that is run during the initial preparation of ML columns.

classattribute single_config#

type: bool

The default flag that marks whether this model only accepts a single config object in case no value is passed in the constructor. Converted into an instance attribute upon instantiation.

classattribute folds#

type: int

The default number of folds for the k-fold cross-validation in case no value is passed in the constructor. Converted into an instance attribute upon instantiation.

classattribute store_name#

type: str, None

The default name for storing input data in case no value is passed in the constructor. When None, the name of the model class is used instead. Converted into an instance attribute upon instantiation.

analysis_inst#

type: order.Analysis

Reference to the order.Analysis object.

parameters#

type: OrderedDict

A dictionary mapping parameter names to arbitrary values, such as {"layers": 5, "units": 128}.

used_datasets#

type: dict read-only

Sets of order.Dataset instances that are used by the model training, mapped to their corresponding order.Config instances.

used_columns#

type: set read-only

Column names or Route’s that are used by this model, mapped to order.Config instances they belong to.

produced_columns#

type: set read-only

Column names or Route’s that are produces by this model, mapped to order.Config instances they belong to.

Attributes:

single_config

folds

store_name

preparation_producer_in_ml_evaluation

init_attributes

config_inst

accepts_scheduler_messages

Whether the training or evaluation loop expects and works with messages sent from a central luigi scheduler through the active worker to the underlying task.

used_columns

produced_columns

Helper function to resolve column names of produced with this MLModel instance.

used_datasets

Methods:

parameter_pairs([only_significant])

Returns a list of all parameter name-value tuples.

get_scheduler_messages(task)

Checks if the task obtained messages from a central luigi scheduler, parses them expecting key - value pairs, and returns them in an ordered DotDict.

setup()

Hook that is called after the model has been setup and its config_insts were assigned.

requires(task)

Returns tasks that are required for the training to run and whose outputs are needed.

training_configs(requested_configs)

Given a sequence of names of requested order.Config objects, requested_configs, this method can alter and/or replace them to define a different (set of) config(s) for the preprocessing and training pipeline.

training_calibrators(config_inst, ...)

Given a sequence of requested_calibrators for a config_inst, this method can alter and/or replace them to define a different set of calibrators for the preprocessing and training pipeline.

training_selector(config_inst, ...)

Given a requested_selector for a config_inst, this method can change it to define a different selector for the preprocessing and training pipeline.

training_producers(config_inst, ...)

Given a sequence of requested_producers for a config_inst, this method can alter and/or replace them to define a different set of producers for the preprocessing and training pipeline.

preparation_producer(config_inst)

This method allows setting a producer that can be called as part of the preparation of the ML input columns given a config_inst.

sandbox(task)

Given a task, returns the name of a sandbox that is needed to perform model training and evaluation.

datasets(config_inst)

Returns a set of all required datasets for a certain config_inst.

uses(config_inst)

Returns a set of all required columns for a certain config_inst.

produces(config_inst)

Returns a set of all produced columns for a certain config_inst.

output(task)

Returns a structure of output targets.

open_model(target)

Implemenents the opening of a trained model from target (corresponding to the structure returned by output()).

train(task, input, output)

Performs the creation and training of a model, being passed a task and its input and output.

evaluate(task, events, models, fold_indices)

Performs the model evaluation for a task on a chunk of events and returns them.

single_config: bool = False#
folds: int = 2#
store_name: str | None = None#
preparation_producer_in_ml_evaluation: bool = True#
init_attributes: list[str] = ['single_config', 'folds', 'store_name', 'preparation_producer_in_ml_evaluation']#
property config_inst: Config#
parameter_pairs(only_significant=False)[source]#

Returns a list of all parameter name-value tuples. In this context, significant parameters are those that potentially lead to different results (e.g. network architecture parameters as opposed to some log level).

Return type:

list[tuple[str, Any]]

property accepts_scheduler_messages: bool#

Whether the training or evaluation loop expects and works with messages sent from a central luigi scheduler through the active worker to the underlying task. See get_scheduler_messages() for more info.

get_scheduler_messages(task)[source]#

Checks if the task obtained messages from a central luigi scheduler, parses them expecting key - value pairs, and returns them in an ordered DotDict. All values are KeyValueMessage objects (with key, value and respond() members).

Scheduler messages are only sent while the task is actively running, so it most likely only makes sense to expect and react to messages during training and evaluation loops.

Return type:

DotDict[str, KeyValueMessage]

property used_columns: dict[order.config.Config, set[columnflow.columnar_util.Route]]#
property produced_columns: dict[order.config.Config, set[columnflow.columnar_util.Route]]#

Helper function to resolve column names of produced with this MLModel instance.

Returns:

Set of column names

property used_datasets: dict[order.config.Config, set[order.dataset.Dataset]]#
setup()[source]#

Hook that is called after the model has been setup and its config_insts were assigned.

Return type:

None

requires(task)[source]#

Returns tasks that are required for the training to run and whose outputs are needed.

Return type:

Any

training_configs(requested_configs)[source]#

Given a sequence of names of requested order.Config objects, requested_configs, this method can alter and/or replace them to define a different (set of) config(s) for the preprocessing and training pipeline. This can be helpful in cases where training and evaluation phase spaces, as well as the required input datasets and/or columns are intended to diverge.

Return type:

list[str]

training_calibrators(config_inst, requested_calibrators)[source]#

Given a sequence of requested_calibrators for a config_inst, this method can alter and/or replace them to define a different set of calibrators for the preprocessing and training pipeline. This can be helpful in cases where training and evaluation phase spaces, as well as the required input columns are intended to diverge.

Example usage:

    def training_calibrators(
        self,
        config_inst: od.Config,
        requested_calibrators: Sequence[str],
    ) -> list[str]:
        # training uses a calibrator named "skip_jecunc"
        return ["skip_jecunc"]
Parameters:

config_inst (Config) – Config instance to extract the requested_calibrators from

Return type:

list[str]

Returns:

Set with str of the requested_calibrators

training_selector(config_inst, requested_selector)[source]#

Given a requested_selector for a config_inst, this method can change it to define a different selector for the preprocessing and training pipeline. This can be helpful in cases where training and evaluation phase spaces, as well as the required input columns are intended to diverge.

Example usage:

    def training_selector(
        self,
        config_inst: od.Config,
        requested_selector: str,
    ) -> str:
        # training uses the default selector
        return "default"
Parameters:

config_inst (Config) – Config instance to extract the requested_selector from

Return type:

str

Returns:

Set with str of the requested_selector

training_producers(config_inst, requested_producers)[source]#

Given a sequence of requested_producers for a config_inst, this method can alter and/or replace them to define a different set of producers for the preprocessing and training pipeline. This can be helpful in cases where training and evaluation phase spaces, as well as the required input columns are intended to diverge.

Example usage:

    def training_producers(
        self,
        config_inst: od.Config,
        requested_producers: Sequence[str],
    ) -> list[str]:
        # training uses the default producer
        return ["default"]
Parameters:

config_inst (Config) – Config instance to extract the requested_producers from

Return type:

list[str]

Returns:

Set with str of the requested_producers

preparation_producer(config_inst)[source]#

This method allows setting a producer that can be called as part of the preparation of the ML input columns given a config_inst.

Parameters:

config_inst (od.Config) – Config object for which the producer should run.

Return type:

str | None

Returns:

Name of a Producer class or None.

abstract sandbox(task)[source]#

Given a task, returns the name of a sandbox that is needed to perform model training and evaluation.

Example usage:

    def sandbox(self, task: law.Task) -> str:
        return "bash::$HBT_BASE/sandboxes/venv_columnar_tf.sh"
Parameters:

task (Task) – Task instance to extract the datasets from

Return type:

str

Returns:

path to the requested sandbox, optinally prefixed by the executing shell command with trailing :: as separator

abstract datasets(config_inst)[source]#

Returns a set of all required datasets for a certain config_inst. To be implemented in subclasses.

Example usage:

    def datasets(self, config_inst: od.Config) -> set[od.Dataset]:
        # normally you would pass this to the model via config and loop through these names ...
        all_datasets_names = self.datasets_name

        dataset_inst = []
        for dataset_name in all_datasets_names:
            dataset_inst.append(config_inst.get_dataset(dataset_name))

        # ... but you can also add one dataset by using its name
        dataset_inst.append(config_inst.get_dataset("tt_sl_powheg"))

        return set(dataset_inst)
Parameters:

config_inst (Config) – Config instance to extract the datasets from

Return type:

set[Dataset]

Returns:

Set with Dataset instances

abstract uses(config_inst)[source]#

Returns a set of all required columns for a certain config_inst. To be implemented in subclasses.

Example usage:

    def uses(self, config_inst: od.Config) -> set[Route | str]:
        columns = set(self.input_features) | set(self.target_features) | {"normalization_weight"}
        return columns
Parameters:

config_inst (Config) – Config instance to extract the datasets from

Return type:

set[Route]

Returns:

Set with str of required columns

abstract produces(config_inst)[source]#

Returns a set of all produced columns for a certain config_inst. To be implemented in subclasses.

Example usage:

    def produces(self, config_inst: od.Config) -> set[Route | str]:
        # mark columns that you don't want to be filtered out
        # preserve the networks prediction of a specific feature for each fold
        # cls_name would be the name of your model
        ml_predictions = {f"{self.cls_name}.fold{fold}.{feature}"
            for fold in range(self.folds)
            for feature in self.target_columns}

        # save indices used to create the folds
        util_columns = {f"{self.cls_name}.fold_indices"}

        # combine all columns to a unique set
        preserved_columns = ml_predictions | util_columns
        return preserved_columns
Parameters:

config_inst (Config) – Config instance to extract the datasets from

Return type:

set[Route]

Returns:

Set with str of produced columns

abstract output(task)[source]#

Returns a structure of output targets. To be implemented in subclasses.

Example usage:

    def output(self, task: law.Task) -> law.FileSystemDirectoryTarget:
        # needs to be given via config
        max_folds = self.folds
        current_fold = task.fold

        # create directory at task.target, if it does not exist
        target = task.target(f"mlmodel_f{current_fold}of{max_folds}", dir=True)
        return target
Parameters:

task (Task) – Task instance used extract task related information

Return type:

Any

Returns:

Instance of DirectoryTarget, containing the path to directory.

abstract open_model(target)[source]#

Implemenents the opening of a trained model from target (corresponding to the structure returned by output()). To be implemented in subclasses.

Example usage:

    def open_model(self, target: law.FileSystemDirectoryTarget):
        # if a formatter exists use formatter
        # e.g. for keras models: target.load(formatter="tf_keras_model")
        loaded_model = tf.keras.models.load_model(target.path)
        return loaded_model
Parameters:

target (Any) – Instance of DirectoryTarget, contains path to directory holding the machine learning model.

Return type:

Any

Returns:

Machine learning model instance

abstract train(task, input, output)[source]#

Performs the creation and training of a model, being passed a task and its input and output. To be implemented in subclasses.

Example usage:

    def train(
        self,
        task: law.Task,
        input: dict[str, list[law.FileSystemFileTarget]],
        output: law.FileSystemDirectoryTarget,
    ) -> None:

        # use helper functions to define model, open input parquet files and prepare events
        # init a model structure
        model = self.build_model()

        # get data tensors
        events = self.open_input_files(input)
        input_tensor, target_tensor = self.prepare_events(events)

        # setup everything needed for training
        optimizer = tf.keras.optimizers.SGD()
        model.compile(
            optimizer,
            loss="mse",
            steps_per_execution=10,
        )

        # train, throw model_history away
        _ = model.fit(
            input_tensor,
            target_tensor,
            epochs=5,
            steps_per_epoch=10,
            validation_split=0.25,
        )

        # save your model and everything you want to keep
        output.dump(model, formatter="tf_keras_model")
        return
Parameters:
  • task (Task) – Task instance used extract task related information

  • input (Any) – List of instances of DirectoryTarget, containing the paths of all required input files

  • output (Any) – Instance of DirectoryTarget, contain path to target directory of the task

Return type:

None

Returns:

None

abstract evaluate(task, events, models, fold_indices, events_used_in_training=False)[source]#

Performs the model evaluation for a task on a chunk of events and returns them. The list of models corresponds to the number of folds generated by this model, and the already evaluated fold_indices for this event chunk that might used depending on events_used_in_training. To be implemented in subclasses.

Example usage:

    def evaluate(
        self,
        task: law.Task,
        events: ak.Array,
        models: list[Any],
        fold_indices: ak.Array,
        events_used_in_training: bool = False,
    ) -> ak.Array:
        # prepare ml_models input features, target is not important
        inputs_tensor, _ = self.prepare_events(events)

        # do evaluation on all data
        # one can get test_set of fold using: events[fold_indices == fold]
        for fold, model in enumerate(models):
            # convert tf.tensor -> np.array -> ak.array
            # to_regular is necessary to make array contigous (otherwise error)
            prediction = ak.to_regular(model(inputs_tensor).numpy())

            # update events with predictions, sliced by feature, and fold_indices for identification purpose
            for index_feature, target_feature in enumerate(self.target_features):
                events = set_ak_column(
                    events,
                    f"{self.cls_name}.fold{fold}.{target_feature}",
                    prediction[:, index_feature],
                )

        events = set_ak_column(
            events,
            f"{self.cls_name}.fold_indices",
            fold_indices,
        )

        return events
Parameters:
  • task (Task) – Task instance used to extract task related information

  • events (Array) – Awkward Array containing the events to evaluate

  • models (list[Any]) – List containing trained models

  • fold_indices (Array) – Awkward Array containing the indices of the folds used for training

  • events_used_in_training (bool, default: False) – Boolean flag to indicate if events were used during training

Return type:

Array

Returns:

Awkward array containing events with additional columns