Source code for columnflow.tasks.external

# coding: utf-8

"""
Tasks dealing with external data.
"""

from __future__ import annotations

import os
import time
import shutil
import subprocess
import dataclasses
import copy

import luigi
import law
import order as od

from columnflow import env_is_local
from columnflow.tasks.framework.base import AnalysisTask, ConfigTask, DatasetTask, wrapper_factory
from columnflow.tasks.framework.parameters import user_parameter_inst
from columnflow.tasks.framework.decorators import only_local_env
from columnflow.util import wget, DotDict
from columnflow.types import Sequence, ClassVar


logger = law.logger.get_logger(__name__)


[docs] class GetDatasetLFNs(DatasetTask, law.tasks.TransferLocalFile): """ Task to get list of logical file names (LFNs). """ replicas = luigi.IntParameter( default=5, description="number of replicas to generate; default: 5", ) validate = law.OptionalBoolParameter( default=None, significant=False, description="when True, complains if the number of obtained LFNs does not match the value " "expected from the dataset info; default: obtained from 'validate_dataset_lfns' auxiliary " "entry in config", ) version = None """Version parameter - deactivated for :py:class:`~columnflow.tasks.external.GetDatasetLFNs` """
[docs] @classmethod def resolve_param_values(cls, params: DotDict) -> DotDict: """ Resolve parameter values *params* from command line and propagate them to this set of parameters. :param params: Parameters provided at command line level. :return: Updated list of parameter values. """ params = super().resolve_param_values(params) # add the default calibrator when empty if "config_inst" in params and params.get("validate") is None: config_inst = params["config_inst"] params["validate"] = config_inst.x("validate_dataset_lfns", False) return params
@property def sandbox(self) -> str: """ Defines sandbox for this task. :return: Path to shell script that sets up the requested sandbox. """ sandbox = self.config_inst.x("get_dataset_lfns_sandbox", None) if sandbox is None: sandbox = "bash::/cvmfs/cms.cern.ch/cmsset_default.sh" return sandbox if sandbox and sandbox != law.NO_STR else None
[docs] def single_output(self) -> law.target.file.FileSystemFileTarget: """ Creates a remote target file for the final .json file containing the list of LFNs. :return: Law remote target with the initialized output name """ # required by law.tasks.TransferLocalFile h = law.util.create_hash(list(sorted(self.dataset_info_inst.keys))) return self.target(f"lfns_{h}.json")
[docs] @only_local_env @law.decorator.notify @law.decorator.log def run(self): """ Run function for this task. :raises ValueError: If number of loaded LFNs does not correspond to number of LFNs specified in this ``dataset_info_inst``. """ # prepare the lfn getter get_dataset_lfns = self.config_inst.x("get_dataset_lfns", None) msg = "via custom config function" if not callable(get_dataset_lfns): get_dataset_lfns = self.get_dataset_lfns_dasgoclient msg = "via dasgoclient" lfns = [] for key in sorted(self.dataset_info_inst.keys): self.logger.info(f"get lfns for dataset key {key} {msg}") lfns.extend(get_dataset_lfns(self.dataset_inst, self.global_shift_inst, key)) if self.validate and len(lfns) != self.dataset_info_inst.n_files: raise ValueError( f"number of obtained lfns ({len(lfns)}) does not match number of files " f"for dataset {self.dataset_inst.name} ({self.dataset_info_inst.n_files})", ) self.logger.info(f"found {len(lfns):_} lfn(s) for dataset {self.dataset}") tmp = law.LocalFileTarget(is_tmp=True) tmp.dump(lfns, indent=4, formatter="json") self.transfer(tmp)
[docs] def get_dataset_lfns_dasgoclient( self, dataset_inst: od.Dataset, shift_inst: od.Shift, dataset_key: str, ) -> list[str]: """ Get the LNF information with the ``dasgoclient``. :param dataset_inst: Current dataset instance, currently not used. :param shift_inst: Current shift instance, currently not used. :param dataset_key: DAS key identifier for the current dataset. :raises Exception: If query with ``dasgoclient`` fails. :return: The list of LFNs corresponding to the dataset with the identifier *dataset_key*. """ code, out, _ = law.util.interruptable_popen( f"dasgoclient --query='file dataset={dataset_key}' --limit=0", shell=True, stdout=subprocess.PIPE, executable="/bin/bash", kill_timeout=1, ) if code != 0: raise Exception(f"dasgoclient query failed:\n{out}") broken_files = dataset_inst[shift_inst.name].get_aux("broken_files", []) return [ line.strip() for line in out.strip().split("\n") if line.strip().endswith(".root") and line.strip() not in broken_files ]
[docs] def iter_nano_files( self, task: AnalysisTask | DatasetTask, fs: str | Sequence[str] | None = None, lfn_indices: list[int] | None = None, eager_lookup: bool | int = 1, skip_fallback: bool = False, ) -> None: """ Generator function that reduces the boilerplate code for looping over files referred to by *lfn_indices* given the lfns obtained by *this* task which needs to be complete for this function to succeed. When *lfn_indices* are not given, *task* must be a branch of a :py:class:`DatasetTask` workflow whose branch value is used instead. :param task: Current task that needs to access the nanoAOD files :param fs: Name of the local or remote file system where the LFNs are located, defaults to None :param lfn_indices: List of indices of LFNs that are processed by this *task* instance, defaults to None :param eager_lookup: Look at the next fs if stat takes too long, defaults to 1 :param skip_fallback: Skip the fallback mechanism to fetch the LFN, defaults to False :raises TypeError: If *task* is not of type :external+law:py:class:`~law.workflow.base.BaseWorkflow` or not a task analyzing a single branch in the task tree :raises Exception: If current task is not complete as indicated with ``self.complete()`` :raises ValueError: If no fs is provided at call and none can be found in either the config instance or the law config. :raises Exception: If a given LFN cannot be found at any fs :yield: a file target that points to a LFN """ # input checks if not lfn_indices: if not isinstance(task, law.BaseWorkflow) or not task.is_branch(): raise TypeError(f"task must be a workflow branch, but got {task}") lfn_indices = task.branch_data if not self.complete(): raise Exception(f"{self} is required to be complete") # prepare fs names to resolve lfns with if not fs: # use an optional hook in the config get_fs = self.config_inst.x("get_dataset_lfns_remote_fs", None) if callable(get_fs): fs = get_fs(self.dataset_inst) if not fs: # use the law config fs = law.config.get_expanded("outputs", "lfn_sources", [], split_csv=True) if not fs: raise ValueError("no fs given or found to resolve lfns") fs = law.util.make_list(fs) # get all lfns output = self.output() target = (output.random_target() if isinstance(output, law.TargetCollection) else output) lfns = target.load(formatter="json") # loop for lfn_index in lfn_indices: # get the lfn of the file referenced by this file index lfn = str(lfns[lfn_index]) # get the input file i = 0 last_working = None while i < len(fs): selected_fs = fs[i] logger.debug(f"checking fs {selected_fs} for lfn {lfn}") # check if the fs is really remote or local fs_base = law.config.get_expanded(selected_fs, "base") is_local = law.target.file.get_scheme(fs_base) in (None, "file") logger.debug(f"fs {selected_fs} is {'local' if is_local else 'remote'}") target_cls = law.LocalFileTarget if is_local else law.wlcg.WLCGFileTarget logger.debug(f"checking fs {selected_fs} for lfn {lfn}") # try an optional fallback to pre-emptively fetch the lfn if necessary if not is_local and not skip_fallback: input_file, input_stat, is_tmp = self._fetch_lfn_fallback(lfn, selected_fs) if input_file: if is_tmp: input_file.is_tmp = True task.publish_message(f"using fs {selected_fs} via pre-emptive fetch") break # measure the time required to perform the stat query input_file = target_cls(lfn.lstrip(os.sep) if is_local else lfn, fs=selected_fs) t1 = time.perf_counter() input_stat = input_file.exists(stat=True) duration = time.perf_counter() - t1 i += 1 logger.info(f"lfn {lfn} (lfn {lfn_index}) does{'' if input_stat else ' not'} exist at fs {selected_fs}") # when the stat query took longer than some duration, eagerly try the next fs # and check if it responds faster and if so, take it instead latency = 4.0 # s if input_stat and eager_lookup: if ( isinstance(eager_lookup, int) and not isinstance(eager_lookup, bool) and i <= eager_lookup ): logger.debug(f"eager fs lookup skipped for fs {selected_fs} at index {i}") else: if input_stat and not last_working and duration > latency and i < len(fs): last_working = selected_fs, input_file, input_stat, duration logger.debug( f"duration exceeded {latency}s, checking next fs for comparison", ) continue if last_working and (not input_stat or last_working[3] < duration): logger.debug("previously checked fs responded faster") selected_fs, input_file, input_stat, duration = last_working # stop when the stat was successful at this point if input_stat: task.publish_message( f"using fs {selected_fs}, stat responded in {law.util.human_duration(seconds=duration)}", ) break else: raise Exception(f"lfn {lfn} (lfn {lfn_index}) not found at any remote fs {fs}") # log the file size input_size = law.util.human_bytes(input_stat.st_size, fmt=True) task.publish_message(f"lfn {lfn} (lfn {lfn_index}), size is {input_size}") yield (lfn_index, input_file)
def _fetch_lfn_fallback( self, lfn: str, selected_fs: str, force: bool = False, ) -> tuple[law.LocalFileTarget | None, os.stat_result | None, bool]: """ Fetches an *lfn* via fallback mechanisms. Currently, only ``xrdcp`` for remote file systems *selected_fs* with `root://` bases is supported. Unless *force* is *True*, no fallbacks are performed in case they are not necessary in the first place (determined by the availability of the ``gfal2`` package). :param lfn: Logical file name to fetch. :param selected_fs: Name of the file system to fetch the LFN from. When remote, its *base* or *base_filecopy* should use the `root://` protocol. :param force: When *True*, forces the fallback to be performed, defaults to *False*. :return: Tuple of the fetched file, its stat, and a flag indicating whether the file is temporary. *None*'s are returned when the file was not fetched. """ # check if the file needs to be fetched in the first place no_result = None, None, False if not force: # when gfal2 is available, no need to fetch try: import gfal2 # noqa: F401 return no_result except ImportError: pass # get the base uri and check if the protocol is supported base = ( law.config.get_expanded(selected_fs, "base_filecopy", None) or law.config.get_expanded(selected_fs, "base") ) scheme = law.target.file.get_scheme(base) if scheme != "root": raise NotImplementedError(f"fetching lfn via {scheme}:// is not supported") uri = base + lfn # if the corresponding fs has a cache and the lfn is already in there, return it # (no need to perform in/validation checks via mtime for lfns) wlcg_fs = law.wlcg.WLCGFileSystem(selected_fs) if wlcg_fs.cache and lfn in wlcg_fs.cache: destination = law.LocalFileTarget(wlcg_fs.cache.cache_path(lfn)) return destination, destination.stat(), False # fetch the file into a temporary location first destination = law.LocalFileTarget(is_tmp="root") cmd = f"xrdcp -f {uri} {destination.abspath}" code = law.util.interruptable_popen(cmd, shell=True, executable="/bin/bash")[0] if code != 0: logger.warning(f"xrdcp failed for {uri}") return no_result # when there is a cache, move the file there stat = destination.stat() if wlcg_fs.cache: with wlcg_fs.cache.lock(lfn): wlcg_fs.cache.allocate(stat.st_size) clfn = law.LocalFileTarget(wlcg_fs.cache.cache_path(lfn)) destination.move_to_local(clfn) return clfn, stat, False # here, the destination will be temporary, but set its tmp flag to False to prevent its # deletion when this method goes out of scope, and set the decision for later use instead destination.is_tmp = False return destination, stat, True
GetDatasetLFNsWrapper = wrapper_factory( base_cls=AnalysisTask, require_cls=GetDatasetLFNs, enable=["configs", "skip_configs", "datasets", "skip_datasets", "shifts", "skip_shifts"], attributes={"version": None}, docs=""" Wrapper task to get LFNs for multiple datasets. :enables: ["configs", "skip_configs", "datasets", "skip_datasets", "shifts", "skip_shifts"] :overwrites: attribute ``version`` with None """, )
[docs] @dataclasses.dataclass class ExternalFile: """ Container object to define an external file resource that is understood by (e.g.) :py:class:`tasks.external.BundleExternalFiles`. Example: .. code-block:: python # refer to a simple file location ExternalFile(location="path/to/file", version="v1") # refer to a directory or archive that contains multiple files ExternalFile(location="some/archive.tgz", subpaths={"file_name": "file/in/archive"}, version="v1") """ location: str subpaths: dict[str, str] = dataclasses.field(default_factory=dict) version: str = "v1" single: bool = dataclasses.field(init=False, default=False) single_key: ClassVar[str] = "_single_key"
[docs] @classmethod def new(cls, resource: ExternalFile | str | tuple[str] | tuple[str, str]) -> ExternalFile: """ Factory method to create a new instance of :py:class:`ExternalFile` with backwards-compatible parsing of simple strings and tuples. """ if isinstance(resource, cls): return resource if isinstance(resource, str): return cls(location=resource) if isinstance(resource, (list, tuple)): if len(resource) == 1: return cls(location=resource[0]) if len(resource) == 2: return cls(location=resource[0], version=resource[1]) raise ValueError(f"invalid resource type and format: {resource}")
def __post_init__(self) -> None: # convert different types of subpaths to dict if isinstance(self.subpaths, str): self.subpaths = DotDict({self.single_key: self.subpaths}) self.single = True elif isinstance(self.subpaths, (list, tuple)): self.subpaths = DotDict(zip(enumerate(self.subpaths))) else: self.subpaths = DotDict.wrap(copy.deepcopy(self.subpaths)) # remove None's for key in list(self.subpaths.keys()): if self.subpaths[key] is None: del self.subpaths[key] def __str__(self) -> str: sub = "" if self.subpaths: if self.single: sub = f"/{self.subpaths[self.single_key]}" else: sub = " / " + ",".join(f"{n}={p}" for n, p in self.subpaths.items()) return f"{self.location}{sub} ({self.version})" def __getattr__(self, attr: str) -> str: if attr in self.subpaths: return self.subpaths[attr] raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attr}'")
[docs] class BundleExternalFiles(ConfigTask, law.tasks.TransferLocalFile): """ Task to collect external files. This task is intended to download source files for other tasks, such as files containing corrections for objects, the "golden" json files, source files for the calculation of pileup weights, and others. All information about the relevant external files is extracted from the given ``config_inst``, which must contain an auxiliary field ``external_files`` like the following (all entries are optional and user-defined): .. code-block:: python # cfg is the current config instance cfg.x.external_files = DotDict.wrap({ "jet_jerc": ExternalFile(f"{SOURCE_URL}/POG/JME/{year}{corr_postfix}_UL/jet_jerc.json.gz", version="v1"), # tau energy correction and scale factors "tau_sf": ExternalFile(f"{SOURCE_URL}/POG/TAU/{year}{corr_postfix}_UL/tau.json.gz", version="v1"), # electron scale factors "electron_sf": ExternalFile(f"{SOURCE_URL}/POG/EGM/{year}{corr_postfix}_UL/electron.json.gz", version="v1"), }) All entries should be :py:class:`ExternalFile` instances. """ single_config = True replicas = luigi.IntParameter( default=5, description="number of replicas to generate; default: 5", ) recreate = luigi.BoolParameter( default=False, significant=False, description="when True, forces the recreation of the bundle even if it exists; default: False", ) user = user_parameter_inst version = None def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # external files, casted to ExternalFile instances once self.ext_files = law.util.map_struct(ExternalFile.new, self.config_inst.x.external_files) # cached hash self._files_hash = None # cached dictionary with the same structure as external files, mapping to unique basenames self._file_names = None # cached dict for lazy access to files in fetched bundle self._files_collection = None
[docs] @classmethod def create_unique_basename(cls, path: str | ExternalFile) -> str | dict[str, str]: if isinstance(path, str): return f"{law.util.create_hash(path)}_{os.path.basename(path)}" # path must be an ExternalFile if path.subpaths: # single mode if path.single: return cls.create_unique_basename(os.path.join(path.location, path.subpaths[path.single_key])) # multiple subpaths return type(path.subpaths)( (name, cls.create_unique_basename(os.path.join(path.location, subpath))) for name, subpath in path.subpaths.items() ) return cls.create_unique_basename(path.location)
@property def files_hash(self) -> str: if self._files_hash is None: # take the external files and flatten them into a deterministic order, then hash def deterministic_flatten(d): return [ (key, (deterministic_flatten(d[key]) if isinstance(d[key], dict) else d[key])) for key in sorted(d) ] flat_files = deterministic_flatten(self.ext_files) self._files_hash = law.util.create_hash(flat_files) return self._files_hash @property def file_names(self) -> DotDict: if self._file_names is None: self._file_names = law.util.map_struct(self.create_unique_basename, self.ext_files) return self._file_names
[docs] def get_files_collection(self, output=None) -> law.SiblingFileCollection: if self._files_collection is None: # get the output if not output: output = self.output() if not output["local_files"].exists(): raise Exception( f"accessing external files from the bundle requires the output of {self} to exist, but it appears " "to be missing", ) self._files_collection = output["local_files"] return self._files_collection
@property def files(self) -> DotDict: return self.get_files_collection().targets @property def files_dir(self) -> law.LocalDirectoryTarget: return self.get_files_collection().dir
[docs] def single_output(self): # required by law.tasks.TransferLocalFile return self.target(f"externals_{self.files_hash}.tgz")
[docs] def output(self): def local_target(basename): path = os.path.join(f"externals_{self.files_hash}", basename) is_dir = "." not in basename # simple heuristic, but type actually checked after unpacking below return self.local_target(path, dir=is_dir) return DotDict( bundle=super().output(), local_files=law.SiblingFileCollection(law.util.map_struct(local_target, self.file_names)), )
[docs] def trace_transfer_output(self, output): return output["bundle"]
[docs] @law.decorator.notify @law.decorator.log def run(self): outputs = self.output() # remove the bundle if recreating if self.recreate and outputs["bundle"].exists(): outputs["bundle"].remove() # bundle only if needed if not outputs["bundle"].exists(): if not env_is_local: raise RuntimeError( f"the output bundle {self.single_output().abspath} is missing, but cannot be created in non-local " "environments", ) # create a tmp dir to work in tmp_dir = law.LocalDirectoryTarget(is_tmp=True) tmp_dir.touch() # create a scratch directory for temporary downloads that will not be bundled scratch_dir = tmp_dir.child("scratch", type="d") scratch_dir.touch() # progress callback progress = self.create_progress_callback(len(law.util.flatten(self.ext_files))) # helper to fetch a single src to dst def fetch(src, dst): if src.startswith(("http://", "https://")): # download via wget wget(src, dst) elif os.path.isfile(src): # copy local file shutil.copy2(src, dst) elif os.path.isdir(src): # copy local dir shutil.copytree(src, dst) else: err = f"cannot fetch {src}" if src.startswith("/") and os.path.isdir("/".join(src.split("/", 2)[:2])): err += ", file or directory does not exist" else: err += ", resource type is not supported" raise NotImplementedError(err) # helper function to fetch generic files def fetch_file(ext_file, counter=[0]): if ext_file.subpaths: # copy to scratch dir first in case a subpath is requested basename = self.create_unique_basename(ext_file.location) scratch_dst = os.path.join(scratch_dir.abspath, basename) fetch(ext_file.location, scratch_dst) # when not a directory, assume the file is an archive and unpack it if not os.path.isdir(scratch_dst): arc_dir = scratch_dir.child(basename.split(".")[0] + "_unpacked", type="d") self.publish_message(f"unpacking {scratch_dst}") law.LocalFileTarget(scratch_dst).load(arc_dir) scratch_src = arc_dir.abspath else: scratch_src = scratch_dst # copy all subpaths if ext_file.single: fetch( os.path.join(scratch_src, ext_file.subpaths[ext_file.single_key]), os.path.join(tmp_dir.abspath, self.create_unique_basename(ext_file)), ) else: basenames = self.create_unique_basename(ext_file) for name, subpath in ext_file.subpaths.items(): fetch(os.path.join(scratch_src, subpath), os.path.join(tmp_dir.abspath, basenames[name])) else: # copy directly to the bundle dir src = ext_file.location dst = os.path.join(tmp_dir.abspath, self.create_unique_basename(ext_file.location)) fetch(src, dst) # log self.publish_message(f"fetched {ext_file}") progress(counter[0]) counter[0] += 1 # fetch all files and cleanup scratch dir law.util.map_struct(fetch_file, self.ext_files) scratch_dir.remove() # create the bundle tmp = law.LocalFileTarget(is_tmp="tgz") tmp.dump(tmp_dir, formatter="tar") # log the file size bundle_size = law.util.human_bytes(tmp.stat().st_size, fmt=True) self.publish_message(f"bundle size is {bundle_size}") # transfer the result self.transfer(tmp, outputs["bundle"]) # remove all local files if recreating or if only existing partially to do a full refresh local_files_exist = outputs["local_files"].exists() if (self.recreate and local_files_exist) or not local_files_exist: outputs["local_files"].dir.remove() local_files_exist = False # unpack the bundle to have local files available if needed if not local_files_exist: with self.publish_step(f"unpacking to {outputs['local_files'].dir.abspath} ..."): bundle = outputs["bundle"] if isinstance(bundle, law.FileCollection): bundle = bundle.random_target() bundle.load(outputs["local_files"].dir, formatter="tar") # check if unpacked files/directories are described by the correct target class for target in outputs["local_files"]._flat_target_list: mismatch = ( (isinstance(target, law.FileSystemFileTarget) and not os.path.isfile(target.abspath)) or (isinstance(target, law.FileSystemDirectoryTarget) and not os.path.isdir(target.abspath)) ) if mismatch: raise Exception(f"mismatching file/directory type of unpacked target {target!r}")
BundleExternalFilesWrapper = wrapper_factory( base_cls=AnalysisTask, require_cls=BundleExternalFiles, enable=["configs", "skip_configs"], attributes={"version": None}, docs=""" Wrapper task trigger the BundleExternalFiles task for multiple configs. """, )