Machine Learning#

In this section, the users will learn how to implement machine learning in their analysis with columnflow.

How training happens in Columnflow: K-fold cross validation#

Machine learning in columnflow is implemented in a way that k-fold cross validation is enabled by default. In k-fold cross validation the dataset is split in k-parts of equal size. For each training, k-1 parts are used for the actual training and the remaining part is used to test the model. This process is repeated k-times, resulting in the training of k-model instances. In the end of the training columnflow will save all k-models, which are then usable for evaluation. An overview and further details about possible variations of k-fold cross validation can be found in the sci-kit documentation.

Configure your custom machine learning class:#

To create a custom machine learning (ML) class in columnflow, it is imperative to inherit from the MLModel class. This inheritance ensures the availability of functions to manage and access config and model instances, as well as the necessary producers. The name of your custom ML class can be arbitrary, since law accesses your machine learning model using a cls_name in derive(), e.g.

test_model = TestModel.derive(cls_name="test_model", cls_dict=configuration_dict)

The second argument in derive() is a cls_dict which configures your subclass. The cls_dict needs to be flat. The keys of the dictionary are set as class attributes and are therefore also accessible by using self. The configuration with derive has two main advantages:

  • manageability, since the dictionary can come from loading a config file and these can be changed fairly easy

  • flexibility, multiple settings require only different configuration files

A possible configuration and model initialization for such a model could look like this:

hyperparameters = {
    "folds": 3,
    "epochs": 5,
}

# input and target features
configuration_dict = {
    "input_features": (
        "n_jet",
        "ht",
    ),
    "target_features": (
        "n_electron",
        "n_muon",
    ),
}

# combine configuration dictionary
configuration_dict.update(hyperparameters)

# init model instance with config dictionary
test_model = TestModel.derive(cls_name="test_model", cls_dict=configuration_dict)

One can also simply define class variables within the model. This is can be useful for attributes that don’t change often, for example to define the datasets your model uses. Since these are also class attributes, they are accessible by using self and are also shared between all instances of this model.

class TestModel(MLModel):
    # shared between all model instances
    datasets: dict = {
        "datasets_name": [
            "hh_ggf_bbtautau_madgraph",
            "tt_sl_powheg",
        ],
    }

    def __init__(
            self,
            *args,
            folds: int | None = None,
            **kwargs,
    ):
        super().__init__(*args, **kwargs)
        # your instance variables

If you have settings that should not be shared between all instances, define them within __init__.

After the configuration#

After the configuration of your ML model, law needs to be informed in the law.cfg about the existence of the new machine learning model. Add in your law.cfg, under the sections analysis, a ml_modules keyword, where you point to the Python module where the model’s definition and derivation happens. These import structures are relative to the analysis root directory.

[analysis]
# any other config entries

# if you want to include multiple things from the same parent module, you can use a comma-separated list in {}
ml_modules: hbt.ml.{test}
inference_modules: hbt.inference.test

ABC functions#

In the following we will go through several abstract functions that you must overwrite, in order to be able to use your custom ML class with columnflow.

sandbox:#

In sandbox(), you specify which sandbox setup file should be sourced to setup the environment for ML usage. The return value of sandbox() is the path to your shell (sh) file, e.g:

    def sandbox(self, task: law.Task) -> str:
        return "bash::$HBT_BASE/sandboxes/venv_columnar_tf.sh"

It is recommended to start the path with bash::, to indicate that you want to source the sandbox() with bash. How to actually write the setup and requirement files can be found in the section about setting up a sandbox.

datasets:#

In the datasets() function, you specify which datasets are important for your machine learning model and which dataset instance(s) should be extracted from your config. To use this function your datasets needs to be added to your campaign, as defined by the Order Module. An example can be found here. It is recommended to return this as set, to prevent double counting. In the following example all datasets given by the external config are taken, but also an additional dataset is given. Note how the name of each dataset is used to get a dataset instance from your config instance. This ensures that you properly use the correct dataset.

    def datasets(self, config_inst: od.Config) -> set[od.Dataset]:
        # normally you would pass this to the model via config and loop through these names ...
        all_datasets_names = self.datasets_name

        dataset_inst = []
        for dataset_name in all_datasets_names:
            dataset_inst.append(config_inst.get_dataset(dataset_name))

        # ... but you can also add one dataset by using its name
        dataset_inst.append(config_inst.get_dataset("tt_sl_powheg"))

        return set(dataset_inst)

produces:#

By default, intermediate columns are not saved within the columnflow framework but are filtered out afterwards. If you want to prevent certain columns from being filtered out, you need to tell columnflow. You can tell columnflow, by let produces() return the names of all columns that should be preserved, do this by define the names as strings within an interable. More information can be found in the official documentation about producers.

In the following example, I want tell columnflow to preserve the output of the neural network, but also the fold indices, that are used to create the trainings and test fold. I do not store the input and target columns of that fold to save disk space, since they are already stored in the training set parquet file. To avoid confusion, we are not producing the columns in this function, we only tell columnflow to not throwing them away.

    def produces(self, config_inst: od.Config) -> set[Route | str]:
        # mark columns that you don't want to be filtered out
        # preserve the networks prediction of a specific feature for each fold
        # cls_name would be the name of your model
        ml_predictions = {f"{self.cls_name}.fold{fold}.{feature}"
            for fold in range(self.folds)
            for feature in self.target_columns}

        # save indices used to create the folds
        util_columns = {f"{self.cls_name}.fold_indices"}

        # combine all columns to a unique set
        preserved_columns = ml_predictions | util_columns
        return preserved_columns

Thus, the choice to include these columns is your choice.

uses:#

In uses you define the columns that are needed by your machine learning model, and are forwarded to the ML model during the execution of the various tasks In this case we want to request the input and target features, as well as some weights:

    def uses(self, config_inst: od.Config) -> set[Route | str]:
        columns = set(self.input_features) | set(self.target_features) | {"normalization_weight"}
        return columns

output:#

In the output() function, you define your local target directory that your current model instance will have access to.

Since machine learning in columnflow uses k-fold cross validation by default, it is a good idea to have a separate directory for each fold, and this should be reflected in the output() path. It is of good practice to store your “machine-learning-instance” files within the directory of the models instance. To get the path to this directory use task.target. In this example we want to save each fold separately e.g:

    def output(self, task: law.Task) -> law.FileSystemDirectoryTarget:
        # needs to be given via config
        max_folds = self.folds
        current_fold = task.fold

        # create directory at task.target, if it does not exist
        target = task.target(f"mlmodel_f{current_fold}of{max_folds}", dir=True)
        return target

open_model:#

In the open_model() function, you implement the loading of the trained model and if necessary its configuration, so it is ready to use for evaluate(). This does not define how the model is build for train().

The target parameter represents the local path to the models directory. In the the following example a TensorFlow model saved with Keras API is loaded and returned, and no further configuration happens:

    def open_model(self, target: law.FileSystemDirectoryTarget):
        # if a formatter exists use formatter
        # e.g. for keras models: target.load(formatter="tf_keras_model")
        loaded_model = tf.keras.models.load_model(target.path)
        return loaded_model

train:#

In the train() function, you implement the initialization of your models and the training loop. The task corresponding to the models training is MLTraining. By default train() has access to the location of the models inputs and outputs.

In columnflow, k-fold cross validation is enabled by default. The self argument in train() referes to the instance of the fold. Using self, you have also access to the entire analysis_instance the config_instance of the current fold, and to all the derived parameters of your model.

With this information, you can call and prepare the columns to be used by the model for training. In the following example a very simple dummy training loop is performed using the Keras fit function. Within this function some helper functions are used that are further explained in the following chapter about good practices.

    def train(
        self,
        task: law.Task,
        input: dict[str, list[law.FileSystemFileTarget]],
        output: law.FileSystemDirectoryTarget,
    ) -> None:

        # use helper functions to define model, open input parquet files and prepare events
        # init a model structure
        model = self.build_model()

        # get data tensors
        events = self.open_input_files(input)
        input_tensor, target_tensor = self.prepare_events(events)

        # setup everything needed for training
        optimizer = tf.keras.optimizers.SGD()
        model.compile(
            optimizer,
            loss="mse",
            steps_per_execution=10,
        )

        # train, throw model_history away
        _ = model.fit(
            input_tensor,
            target_tensor,
            epochs=5,
            steps_per_epoch=10,
            validation_split=0.25,
        )

        # save your model and everything you want to keep
        output.dump(model, formatter="tf_keras_model")
        return

Good practice for training:#

It is considered to be a good practice to define helper functions for model building and column preprocessing and call them in train(). The reasoning behind this design decision is that in train(), you want to focus more on the definition of the actual trainings loop. Another reason is that especially the preparation of events can take a large amount of code lines, making it messy to debug. In the following example it is shown how to define these helper functions. First of all one needs a function to handle the opening and combination of all parquet files.

    def open_input_files(self, inputs):
        # contains files from all datasets
        events_of_datasets = inputs["events"][self.config_inst.name]

        # get datasets names
        # datasets = [dataset.label for dataset in self.datasets(self.config_inst)]

        # extract all columns from parquet files for all datasets and stack them
        all_events = []
        for dataset, parquet_file_targets in events_of_datasets.items():
            for parquet_file_target in parquet_file_targets:
                parquet_file_path = parquet_file_target["mlevents"].path
                events = ak.from_parquet(parquet_file_path)
                all_events.append(events)

        all_events = ak.concatenate(all_events)
        return all_events

In prepare_events we use the combined awkward array and filter out all columns we are not interested in during the training, to stay lightweight. Next we split the remaining columns into input and target column and bring these columns into the correct shape, data type, and also use certain preprocessing transformation. At the end we transform the awkward array into a Tensorflow tensor, something that our machine learning model can handle.

    def prepare_events(self, events):
        # helper function to extract events and prepare them for training

        column_names = set(events.fields)
        input_features = set(self.input_features)
        target_features = set(self.target_features)

        # remove columns not used in training
        to_remove_columns = list(column_names - (input_features | target_features))

        for to_remove_column in to_remove_columns:
            print(f"removing column {to_remove_column}")
            events = remove_ak_column(events, to_remove_column)

        # ml model can't work with awkward arrays
        # we need to convert them to tf.tensors
        # this is done by following step chain:
        # ak.array -> change type to uniform type -> np.recarray -> np.array -> tf.tensor

        # change dtype to uniform type
        events = ak.values_astype(events, "float32")
        # split data in inputs and target
        input_columns = [events[input_column] for input_column in self.input_features]
        target_columns = [events[target_column] for target_column in self.target_features]

        # convert ak.array -> np.array -> bring in correct shape
        input_data = ak.concatenate(input_columns).to_numpy().reshape(
            len(self.input_features), -1).transpose()
        target_data = ak.concatenate(target_columns).to_numpy().reshape(
            len(self.target_features), -1).transpose()
        return tf.convert_to_tensor(input_data), tf.convert_to_tensor(target_data)

The actual building of the model is also handled by a separate function.

    def build_model(self):
        # helper function to handle model building
        x = tf.keras.Input(shape=(2,))
        a1 = tf.keras.layers.Dense(10, activation="elu")(x)
        y = tf.keras.layers.Dense(2, activation="softmax")(a1)
        model = tf.keras.Model(inputs=x, outputs=y)
        return model

With this little example one can see why it is of good practice to separate this process into small chunks.

evaluate:#

Within train() one defined the trainings process of the ML model, while in evaluate() its evaluation is defined. The corresponding task is MLEvaluation, which depends on MLTraining and will therefore trigger a training if no training was performed before.

For each fold of the k-folds a neural network model is trained and can be accessed by models. The actual loading, of the trained model stored in the list, is defined in open_model() function.

The awkward array events is loaded in chunks and contains the merged events of all folds. To filter out the test set, create a mask with fold_indices.

If you want to preserve columns and write them out into a parquet file, append the columns to events using set_ak_column() and return events. All columns not present in produces() are then filtered out.

In the following example, the models prediction as well as the number of muons and electrons are saved, all the other columns in events are thrown away, since they are not present in produce:

Don’t confuse this behavior with the parameter events_used_in_training. This flag determines if a certain dataset and shift combination can be used by the task.

    def evaluate(
        self,
        task: law.Task,
        events: ak.Array,
        models: list[Any],
        fold_indices: ak.Array,
        events_used_in_training: bool = False,
    ) -> ak.Array:
        # prepare ml_models input features, target is not important
        inputs_tensor, _ = self.prepare_events(events)

        # do evaluation on all data
        # one can get test_set of fold using: events[fold_indices == fold]
        for fold, model in enumerate(models):
            # convert tf.tensor -> np.array -> ak.array
            # to_regular is necessary to make array contigous (otherwise error)
            prediction = ak.to_regular(model(inputs_tensor).numpy())

            # update events with predictions, sliced by feature, and fold_indices for identification purpose
            for index_feature, target_feature in enumerate(self.target_features):
                events = set_ak_column(
                    events,
                    f"{self.cls_name}.fold{fold}.{target_feature}",
                    prediction[:, index_feature],
                )

        events = set_ak_column(
            events,
            f"{self.cls_name}.fold_indices",
            fold_indices,
        )

        return events

The evaluations output is saved as parquet file with the name of the model as field name. To get the files path afterwards, rerun the same law command again, but with --print-output 0 at the end.

Commands to start training and evaluation#

To start the machine learning training law run cf.MLTraining, while for evaluation use law run cf.MLEvaluation. When not using the config, calibrators, selectors or dataset

law run cf.MLTraining \
    --version your_current_version \
    --ml-model test_model_name \
    --config run2_2017_nano_uhh_v11_limited \
    --calibrators calibrator_name \
    --selector selector_name \
    --dataset datasets_1,dataset_2,dataset_3,...
law run cf.MLEvaluation \
    --version your_current_version \
    --ml-model test_model_name \
    --config run2_2017_nano_uhh_v11_limited \
    --calibrators calibrator_name

Most of these settings should sound familiar, if not look into the corresponding tutorial. version defines a setup configuration of your ML task, think more of a label than of an actual version. If you change the version label, columnflow will rerun all dependencies that are unique for this label, this typically just means you will retrain a new model. You can then switch freely between both models version.

Optional useful functions:#

Separate training and evaluation configuration for configs, calibrators, selector and producers:#

By default chosen configs, calibrators, selector and producer are used for both training and evaluation. Sometimes one does not want to share the same environment or does not need all the columns in evaluation as in training. Another possible scenario is the usage of different selectors or datasets, to be able to explore different phase spaces. This is where a separation of both comes in handy.

To separate this behavior one need to define the training_{configs,calibrators, selector,producers}. These functions take always the config_inst as first, and the requested_{configs,calibrators,selector,producers} as second parameter. If this function is defined the evaluation will use the externally-defined config,calibrator, selector or producer, while the training will use one defined in the function.

In the following case, training will use a fixed selector and producer called default, and a custom-defined calibrator called skip_jecunc: the calibrator for the evaluation is provided by the used command. Take special note on the numerus of the functions name and of course of the type hint. The selector expects only a string, since we typically apply only 1 selection, while the calibrator or producers expect a sequence of strings. In this special case we use an own defined calibrator called “skip_jecunc”, which is of course defined within the law.cfg.

    def training_selector(
        self,
        config_inst: od.Config,
        requested_selector: str,
    ) -> str:
        # training uses the default selector
        return "default"
    def training_producers(
        self,
        config_inst: od.Config,
        requested_producers: Sequence[str],
    ) -> list[str]:
        # training uses the default producer
        return ["default"]
    def training_calibrators(
        self,
        config_inst: od.Config,
        requested_calibrators: Sequence[str],
    ) -> list[str]:
        # training uses a calibrator named "skip_jecunc"
        return ["skip_jecunc"]

setup#

Setup is called at the end of __init__ of your ML model. Within this function you can prepare operations that should stay open during the whole life time of your instance. A typical example for this would be the opening of a file or dynamically appending variables that are only relevant to your ML model, and no where else. In the following example definitions needed to plot variables that are created in ML model are added to the config_inst:

    def setup(self):
        # dynamically add variables for the quantities produced by this model
        if f"{self.cls_name}.n_muon" not in self.config_inst.variables:
            self.config_inst.add_variable(
                name=f"{self.cls_name}.n_muon",
                null_value=-1,
                binning=(4, -1.5, 2.5),
                x_title="Predicted number of muons",
            )
            self.config_inst.add_variable(
                name=f"{self.cls_name}.n_electron",
                null_value=-1,
                binning=(4, -1.5, 2.5),
                x_title="Predicted number of electrons",
            )
        return