Source code for columnflow.calibration.util
# coding: utf-8
"""
Useful functions for use by calibrators
"""
from __future__ import annotations
from columnflow.types import Callable
from columnflow.columnar_util import flat_np_view, layout_ak_array
from columnflow.util import maybe_import
np = maybe_import("numpy")
ak = maybe_import("awkward")
# https://github.com/scikit-hep/awkward/issues/489\#issuecomment-711090923
[docs]
def ak_random(*args, rand_func: Callable) -> ak.Array:
"""
Return an awkward array filled with random numbers.
The *args* must be broadcastable awkward arrays and will be passed as positional arguments to
*rand_func* to obtain the random numbers.
:param rand_func: Callable to generate random numbers from awkward arrays in *args*.
:return: awkward array filled with random numbers.
"""
args = ak.broadcast_arrays(*args)
if hasattr(args[0].layout, "offsets"):
# pass flat arrays to random function and get random values
np_randvals = rand_func(*map(flat_np_view, args))
# apply layout of first (ak) array
return layout_ak_array(np_randvals, args[0])
# pass args directly (this may fail for some array types)
np_randvals = rand_func(*args)
return ak.from_numpy(np_randvals)
[docs]
def sum_transverse(pt: ak.Array, phi: ak.Array) -> tuple[ak.Array, ak.Array]:
"""
Helper function to compute the sum of transverse vectors given their pt and phi values.
:param pt: Transverse momentum of the vector(s).
:param phi: Azimuthal angle of the vector(s).
:return: Tuple containing the transverse momentum and azimuthal angle of the sum of the vectors.
"""
px_sum = ak.sum(pt * np.cos(phi), axis=-1)
py_sum = ak.sum(pt * np.sin(phi), axis=-1)
# compute new components
pt_sum = (px_sum**2.0 + py_sum**2.0)**0.5
phi_sum = np.arctan2(py_sum, px_sum)
return pt_sum, phi_sum
[docs]
def propagate_met(
jet_pt1: ak.Array,
jet_phi1: ak.Array,
jet_pt2: ak.Array,
jet_phi2: ak.Array,
met_pt1: ak.Array,
met_phi1: ak.Array,
) -> tuple[ak.Array, ak.Array]:
"""
Helper function to compute new MET based on per-jet pts and phis before and after a correction.
Since the pts and phis parameterize the individual jets, the dimensions of the arrays
(*jet_pt1*, *jet_phi1*) as well as (*jet_pt2*, *jet_phi2*) must be the same. The pt values are
decomposed into their x and y components, which are then propagated to the corresponding
contributions to the MET vector
:param jet_pt1: transverse momentum of first jet(s)
:param jet_phi1: azimuthal angle of first jet(s)
:param jet_pt2: transverse momentum of second jet(s)
:param jet_phi2: azimuthal angle of second jet(s)
:param met_pt1: missing transverse momentum (MET)
:param met_phi1: azimuthal angle of MET vector
:raises AssertionError: if arrays (*jet_pt1*, *jet_phi1*) and (*jet_pt2*, *jet_phi2*) have
different dimensions.
:return: updated values of MET vector, i.e. missing transverse momentum and corresponding
azimuthal angle phi.
"""
# avoid unwanted broadcasting
if jet_pt1.ndim != jet_phi1.ndim:
raise Exception(
f"dimension of jet_pt1 {jet_pt1.ndim} does not match dimension of jet_phi1 "
f"{jet_phi1.ndim}",
)
if jet_pt2.ndim != jet_phi2.ndim:
raise Exception(
f"dimension of jet_pt2 {jet_pt2.ndim} does not match dimension of jet_phi2 "
f"{jet_phi2.ndim}",
)
# build px and py sums before and after
jet_px1 = jet_pt1 * np.cos(jet_phi1)
jet_py1 = jet_pt1 * np.sin(jet_phi1)
jet_px2 = jet_pt2 * np.cos(jet_phi2)
jet_py2 = jet_pt2 * np.sin(jet_phi2)
# sum over axis 1 when not already done
if jet_pt1.ndim > 1:
jet_px1 = ak.sum(jet_px1, axis=1)
jet_py1 = ak.sum(jet_py1, axis=1)
if jet_pt2.ndim > 1:
jet_px2 = ak.sum(jet_px2, axis=1)
jet_py2 = ak.sum(jet_py2, axis=1)
# propagate to met
met_px2 = met_pt1 * np.cos(met_phi1) - (jet_px2 - jet_px1)
met_py2 = met_pt1 * np.sin(met_phi1) - (jet_py2 - jet_py1)
# compute new components
met_pt2 = (met_px2**2.0 + met_py2**2.0)**0.5
met_phi2 = np.arctan2(met_py2, met_px2)
return met_pt2, met_phi2