Source code for columnflow.tasks.cms.external

# coding: utf-8

"""
CMS related tasks dealing with external data.
"""

from __future__ import annotations

__all__ = []

import os
import glob

import luigi
import law

from columnflow.types import Sequence
from columnflow.tasks.framework.base import Requirements, AnalysisTask, ConfigTask, wrapper_factory
from columnflow.tasks.external import BundleExternalFiles
from columnflow.util import safe_div


logger = law.logger.get_logger(__name__)


[docs] class CreatePileupWeights(ConfigTask): task_namespace = "cf.cms" single_config = True data_mode = luigi.ChoiceParameter( default="hist", choices=["hist", "pileupcalc"], description="the mode to obtain the data pu profile; choices: 'hist', 'pileupcalc'; " "default: 'hist'", ) version = None sandbox = "bash::$CF_BASE/sandboxes/cmssw_default.sh" # upstream requirements reqs = Requirements( BundleExternalFiles=BundleExternalFiles, )
[docs] def requires(self): return self.reqs.BundleExternalFiles.req(self)
[docs] def output(self): return self.target(f"weights_from_{self.data_mode}.json")
[docs] def sandbox_stagein(self): # stagein all inputs return True
[docs] def sandbox_stageout(self): # stageout all outputs return True
[docs] @law.decorator.log @law.decorator.safe_output def run(self): # prepare the external files and the output weights externals = self.requires() weights = {} # since this tasks uses stage-in into and stage-out from the sandbox, # prepare external files with the staged-in inputs externals.get_files_collection(self.input()) # read the mc profile mc_profile = self.read_mc_profile_from_cfg(externals.files.pu.mc_profile) # loop over nominal and minbias_xs shifts for shift in ["nominal", "minbias_xs_up", "minbias_xs_down"]: # read or create the data profile if self.data_mode == "hist": pu_hist_target = externals.files.pu.data_profile[shift] data_profile = self.read_data_profile_from_hist(pu_hist_target) else: # "pileupcalc" pu_file_target = externals.files.pu.json mb_xs = self.config_inst.x.minbias_xs.get(shift) data_profile = self.read_data_profile_from_pileupcalc(pu_file_target, mb_xs) # build the ratios and save them if len(mc_profile) != len(data_profile): raise Exception( f"the number of bins in the MC profile ({len(mc_profile)}) does not match that " f"of the data profile for the {shift} shift ({len(data_profile)})", ) # store them weights[shift] = [safe_div(d, m) for m, d in zip(mc_profile, data_profile)] self.publish_message(f"computed pu weights for shift {shift}") # save it self.output().dump(weights, formatter="json")
[docs] @classmethod def read_mc_profile_from_cfg( cls, pu_config_target: law.FileSystemTarget, ) -> list[float]: """ Takes a mc pileup configuration file stored in *pu_config_target*, parses its content and returns the pu profile as a list of float probabilities. """ probs = [] # read non-empty lines lines = map(lambda s: s.strip(), pu_config_target.load(formatter="text").split("\n")) # loop through them and extract probabilities found_prob_line = False for line in lines: # look for the start of the pu profile if not found_prob_line: if not line.startswith("mix.input.nbPileupEvents.probValue"): continue found_prob_line = True line = line.split("(", 1)[-1].strip() # skip empty lines if not line: continue # extract numbers probs.extend(map(float, filter(bool, line.split(")", 1)[0].split(",")))) # look for the end of the pu profile if ")" in line: break return cls.normalize_values(probs)
[docs] @classmethod def read_data_profile_from_hist( cls, pu_hist_target: law.FileSystemTarget, ) -> list[float]: """ Takes the pileup profile in data preproducd by the lumi pog and stored in *pu_hist_target*, builds the ratio to mc and returns the weights in a list. """ hist_file = pu_hist_target.load(formatter="uproot") probs = hist_file["pileup"].values().tolist() return cls.normalize_values(probs)
[docs] @classmethod def read_data_profile_from_pileupcalc( cls, pu_file_target: law.FileSystemTarget, minbias_xs: float, ) -> list[float]: """ Takes the pileup profile in data read stored in *pu_file_target*, which should have been produced when processing data, and a *minbias_mexs* value in mb (milli), builds the ratio to mc and returns the weights in a list for 99 bins (recommended number). """ raise NotImplementedError
[docs] @classmethod def normalize_values(cls, values: Sequence[float]) -> list[float]: _sum = sum(values, 0.0) return [value / _sum for value in values]
CreatePileupWeightsWrapper = wrapper_factory( base_cls=AnalysisTask, require_cls=CreatePileupWeights, enable=["configs", "skip_configs"], attributes={"version": None}, )
[docs] class CheckCATUpdates(ConfigTask, law.tasks.RunOnceTask): """ CMS specific task that checks for updates in the metadata managed and stored by the CAT group. See https://cms-analysis-corrections.docs.cern.ch for more info. To function correctly, this task requires an auxiliary entry ``cat_info`` in the analysis config, pointing to a :py:class:`columnflow.cms_util.CATInfo` instance that defines the era information and the current POG correction timestamps. The task will then check in the CAT metadata structure if newer timestamps are available. """ task_namespace = "cf.cms" version = None single_config = False
[docs] def run(self): # helpers to convert date strings to tuples for numeric comparisons decode_date_str = lambda s: tuple(map(int, s.split("-"))) # loop through configs for config_inst in self.config_insts: if not (cat_info := config_inst.x("cat_info", None)): self.logger.warning(f"no 'cat_info' entry found in config '{config_inst.name}', skipping") continue with self.publish_step( f"checking CAT metadata updates for config '{law.util.colored(config_inst.name, style='bright')}' in " f"{cat_info.metadata_root}", ): newest_dates = {} updated_any = False for pog, date_str in cat_info.snapshot.items(): if not date_str: continue # get all versions in the cat directory, split by date numbers pog_era_dir = os.path.join( cat_info.metadata_root, pog.upper(), cat_info.get_era_directory(pog), ) if not os.path.isdir(pog_era_dir): self.logger.warning(f"CAT metadata directory '{pog_era_dir}' does not exist, skipping") continue dates = [ os.path.basename(path) for path in glob.glob(os.path.join(pog_era_dir, "*-*-*")) ] if not dates: raise ValueError(f"no CAT snapshots found in '{pog_era_dir}'") # compare with current date latest_date_str = max(dates, key=decode_date_str) if date_str == "latest" or decode_date_str(date_str) < decode_date_str(latest_date_str): newest_dates[pog] = latest_date_str updated_any = True self.publish_message( f"found newer {law.util.colored(pog.upper(), color='cyan')} snapshot: {date_str} -> " f"{latest_date_str} ({cat_info.get_era_url(pog, latest_date_str)})", ) else: newest_dates[pog] = date_str # print a new CATSnapshot line that can be copy-pasted into the config if updated_any: args_str = ", ".join(f"{pog}=\"{date_str}\"" for pog, date_str in newest_dates.items() if date_str) self.publish_message( f"{law.util.colored('new CATSnapshot line ->', style='bright')} CATSnapshot({args_str})\n", ) else: self.publish_message("no updates found\n")