"""
The module :mod:`snewpy.rate_calculator` defines a Python interface for calculating event rates using data from SNOwGLoBES
Reference
---------
.. autoclass:: RateCalculator
:members: run
"""
import numpy as np
from snewpy.snowglobes_interface import SnowglobesData, guess_material
from snewpy.neutrino import Flavor
from snewpy.flux import Container
from astropy import units as u
from warnings import warn
from typing import Dict
from typing import Callable
import scipy.stats as st
#various utility methods
def center(a):
return 0.5*(a[1:]+a[:-1])
u.kt = u.def_unit('kt',represents=1e6<<u.kg, doc='kilotonne')
class SmearingMatrix:
"""A class describing energy smearing, i.e. matrix transforming flux from :math:`E_\nu` (true neutrino energy) to :math:`E_{det}` (observed or smeared energy) space"""
def __init__(self, bins_true:u.Quantity, bins_smeared:u.Quantity, matrix:np.ndarray):
"""
Parameters
----------
bins_true: Quantity[energy]
array of true energy bin limits
bins_smeared: Quantity[energy]
array of smeared (detected) energy bin limits
matrix: 2D numpy array
Smearing matrix of shape ``[len(bins_true)-1,len(bins_smeared)-1]``
"""
self.bins_true = bins_true
self.bins_smeared = bins_smeared
self.matrix = matrix
assert matrix.shape==(len(bins_true)-1,len(bins_smeared)-1), \
f"Matrix shape {matrix.shape} is inconsistent with (bins_true-1, bins_smeared-1)={len(bins_true)-1,len(bins_smeared)-1}"
def apply(self, rate:Container)-> Container:
"""Apply the smearing to the given rate/flux object,
and produce a smeared one
Parameters
----------
rate: Container
Input rate/flux object in true energy space.
If the rate is integrated over energy (binned), the energy bins must be equal to :attr:`self.bins_true`.
Returns
-------
Container
The rate/flux in the smeared energy space.
The energy bins of the returned container are equal to :attr:`self.bins_smeared`
"""
if rate.can_integrate('energy'):
rate = rate.integrate('energy', self.bins_true)
assert np.allclose(rate.energy<<u.GeV, self.bins_true<<u.GeV), \
f"rate.energy: {rate.energy<<u.GeV}!={self.bins_true<<u.GeV}"
rateS_array = np.dot(rate.array, self.matrix)
rateS = Container(rateS_array,
flavor=rate.flavor,
time=rate.time,
energy=self.bins_smeared,
integrable_axes=rate._integrable_axes)
return rateS
@classmethod
def from_Gaussian(cls, bins_true:u.Quantity, bins_smeared:u.Quantity,
mean:Callable, sigma:Callable=1<<u.keV):
"""Construct a smearing matrix corresponding to the Gaussian smearing, with given mean and sigma.
Parameters
----------
bins_true: Quantity[energy]
array of true energy bin limits
bins_smeared: Quantity[energy]
array of smeared (detected) energy bin limits
mean: Callable or u.Quantity[energy]
Mean value of the smeared energy.
Can be a function of E_nu, an array (of size `len(bins_true)-1`) or a scalar.
sigma: Callable or u.Quantity[energy]
Sigma of the smeared energy distribution.
Can be a function of E_nu, an array (of size `len(bins_true)-1`) or a scalar.
Examples
--------
>>> e_true =np.linspace(0,30,501)<<u.MeV
>>> e_smear=np.linspace(0,30,501)<<u.MeV
>>> #IBD process with minimal smearing, but with constant shift E_det = E_nu-0.87 MeV
>>> smear_IBD = SmearingMatrix.from_Gaussian(e_true,e_smear, mean=lambda e_nu:e_nu-0.87*u.MeV)
>>> #NC on C12 process with large smearing and constant energy response at 15.1 MeV
>>> smear_NC_C = SmearingMatrix.from_Gaussian(e_true,e_smear, mean=15.1*u.MeV, sigma=2*u.MeV)
"""
e_t = center(bins_true)<<u.MeV
e_s = bins_smeared<<u.MeV
if callable(mean):
mean = mean(e_t)
if callable(sigma):
sigma = sigma(center(bins_true))
#expand the scalar to required size
mean = np.ones(e_t.shape)*mean
sigma = np.ones(e_t.shape)*sigma
#true energy will be axis=0
mean = np.atleast_2d(mean).T<<u.MeV
sigma = np.atleast_2d(sigma).T<<u.MeV
#smeared energy will be axis=1
e_s = np.atleast_2d(e_s)
distr = st.norm(loc=mean, scale=sigma)
#calculate integral in each bin
cdf = distr.cdf(e_s)
pdf = np.diff(cdf, axis=1)
return cls(bins_true, bins_smeared, pdf)
class FunctionOfEnergy:
"""A wrapper around a given function of energy, for example interaction cross-section, or a detection efficiency.
This helper class will perform a correct operation when multiplied to a :class:`Container` object - rate or flux.
"""
def __init__(self, callable):
"""
Parameters
----------
callable: Callable
A function of one parameter (energy). This can be an analitical function, or an interpolation of a (E,value) dataset
"""
self.value = callable
def __mul__(self, f:Container)->Container:
e = f.energy #Define sample points
if not f.can_integrate('energy'): #we have bins, let's use central values for sampling
e = center(f.energy)
return f*self.value(e)
def __rmul__(self, f:Container)->Container:
#same as multiplication from the left
return self.__mul__(f)
def __call__(self, energy):
return self.value(energy)
@classmethod
def from_threshold(cls, e_min=1<<u.MeV):
return cls(lambda e: 1*(e>e_min))
class DetectionChannel:
"""Description of a single detection channel in the detector"""
def __init__(self, flavor:Flavor, xsec:callable, smearing:SmearingMatrix=None,
efficiency:FunctionOfEnergy=1., weight:float=1.):
"""
Parameters
----------
name:str
channel name
flavor:Flavor
flavor of the interacting neutrino
xsec:callable or FunctionOfEnergy
crossection as a function of energy
smearing:SmearingMatrix
The detection energy smearing matrix.
If `None`(default) no smearing is applied
efficiency: float or np.ndarray or callable.
The detection efficiency vs. detected energy.
If scalar (float), efficiency is independent of energy. Value of `1.` (default) means 100% efficiency.
If it's a vector (np.ndarray), the values correspond to the efficiency in each bin of `smearing.bins_smeared`.
If it's a callable it should define efficiency as a function of energy
weight:float.
Channel weight, to be multiplied to the resulting event rates.
Default value is 1.
"""
self.flavor=flavor
self.xsec = xsec
self.efficiency = efficiency
self.smearing = smearing
self.weight = weight
@property
def flavor(self):
return self._flavor
@flavor.setter
def flavor(self, flavors):
if isinstance(flavors, Flavor):
flavors = [flavors]
self._flavor = flavors
@property
def efficiency(self):
return self._efficiency
@efficiency.setter
def efficiency(self, efficiency):
if callable(efficiency) and not isinstance(efficiency,FunctionOfEnergy):
efficiency = FunctionOfEnergy(efficiency)
self._efficiency = efficiency
@property
def xsec(self):
return self._xsec
@xsec.setter
def xsec(self, xsec):
if callable(xsec) and not isinstance(xsec,FunctionOfEnergy):
xsec = FunctionOfEnergy(xsec)
self._xsec = xsec
def __repr__(self):
return f'{self.__class__.__name__} (flavor={",".join([f.name for f in self.flavor])}, smearing={self.smearing is not None}, weight={self.weight})'
def calc_rate(self, flux:Container, apply_smearing=True, apply_efficiency=True)->Container:
"""Calculate the event rate in this channel
Parameters
----------
flux:Container
Input neutrino flux
apply_smearing:bool
If `True`(default) apply the energy smearing, otherwise the result will be just interaction rate vs. neutrino energy
apply_efficiency:bool
If `True`(default) apply the efficiency, otherwise assume 100% efficiency
"""
rate = self._calc_interaction_rate(flux)
if apply_smearing:
if self.smearing is not None:
rate = self.smearing.apply(rate)
if apply_efficiency:
rate = rate*self.efficiency
return rate
def _calc_interaction_rate(self, flux):
"""calculate interaction rate for given channel"""
tgt_mass = 1<<u.kt
Ntargets = tgt_mass.to_value(u.Dalton)
#sum flux over flavors
array_total = sum([flux[flv].array for flv in self.flavor])
#create a summary flux container
flux_total = Container(array_total, flavor=self.flavor,
time=flux.time, energy=flux.energy,
integrable_axes=flux._integrable_axes)
rate = self.xsec*flux_total*self.weight*Ntargets
return rate
class Detector:
"""A detector configuration for the rate calculation. """
def __init__(self, name:str, mass: u.Quantity, channels: Dict[str,DetectionChannel]):
"""
Parameters
----------
name: str
Detector name
mass: Quantity[mass]
Detector mass
channels: Dict[str,DetectionChannel]
Dictionary of detection channels in the format {name:channel}
Note
----
These parameters and detection channels can be modified later, before calling :meth:`run`
"""
self.name = name
self.mass = mass
self.channels = channels
def __repr__(self):
return f'Detector(name="{self.name}", mass={self.mass}, channels={list(self.channels)})'
def run(self, flux:Container, detector_effects:bool=True)->Dict[str, Container]:
"""Calculate the interaction rates for all channels in the detector.
Parameters
----------
flux:Container
The incoming neutrino flux (or fluence).
detector_effects:bool
If `True` (default) apply the smearing and efficiency for all channels. Otherwise just calculate the interaction rates vs neutrino energy.
Returns
-------
Dict[str,Container]
Event rate for each detection channel in as dictionary {channel name: event rate}
"""
result = {}
for name,channel in self.channels.items():
rate = channel.calc_rate(flux, apply_efficiency=detector_effects, apply_smearing=detector_effects)
result[name] = rate*(self.mass/(1<<u.kt))
return result
def _get_flavor_index(channel):
_map = {'+e':Flavor.NU_E,
'-e':Flavor.NU_E_BAR,
'+m':Flavor.NU_X,
'-m':Flavor.NU_X_BAR,
'+t':Flavor.NU_X,
'-t':Flavor.NU_X_BAR
}
return _map[channel.parity+channel.flavor]
def _bin_edges_from_centers(centers:np.ndarray)->np.ndarray:
"""calculate the bin edges based on the given central values"""
binw = np.diff(centers) #get the bin width
edges = centers-0.5*np.pad(binw,(0,1),mode='edge') #get lower edges
edges = np.append(edges,edges[-1]+binw[-1])
return edges
#--------------------------------------
[docs]
class RateCalculator(SnowglobesData):
r"""Simple rate calculation interface.
Computes expected rate for a detector using SNOwGLoBES data.
Use :meth:`RateCalculator.run` method to run the simulation for specific detector and flux object.
Input flux can either be energy differential flux :math:`F_\nu(E_i) = \frac{d N}{d_E}(E_i)` or integral flux in the energy bins :math:`N_{\nu}(E_i)`.
If the `detector_effects=False` the result is the number of neutrino interactions :math:`N` - a product with with cross-section :math:`\sigma(E)` and the number of targets in the detector :math:`N_{tgt}`:
.. math:: N (E_i) = N_{tgt} \cdot F_\nu(E_i)\cdot \sigma(E_i)
If the detector effects are included, it is integrated within the energy bins and multiplied by the smearing matrix and detection efficiency to get number of interactions in energy bin:
.. math::
N_i = N_{tgt} \cdot \int\limits_{E_i}^{E_{i+1}}~F_\nu(E_i)~\sigma(E_i)~d E
and this is convoluted with the detector smearing matrix :math:`M_{ij}` and efficiency :math:`\varepsilon(E)`:
.. math::
N^{rec}_i = \sum\limits_{j} M_{ij} \cdot N_j \cdot \varepsilon(E_i)
"""
def __init__(self, base_dir=''):
"""
Parameters
----------
base_dir: Path or None
Path to the directory where the cross-section, detector, and channel files are located
If empty, try to get it from ``$SNOWGLOBES`` environment var
"""
super().__init__(base_dir=base_dir)
def load_xsec(self, channel_name:str, flavor:Flavor)->FunctionOfEnergy:
"""Load cross-section for a given channel, interpolated in the energies"""
xsec = np.loadtxt(self.base_dir/f"xscns/xs_{channel_name}.dat")
# Cross-section in 10^-38 cm^2
xp = xsec[:,0]
#get the column to read from the file
column = {Flavor.NU_E:1, Flavor.NU_X:2, Flavor.NU_E_BAR:4, Flavor.NU_X_BAR:5}[flavor]
yp = xsec[:, column]
def xsec(energies):
E = energies.to_value('GeV')
return np.interp(np.log(E)/np.log(10), xp, yp, left=0, right=0)*E*1e-38 <<u.cm**2
return FunctionOfEnergy(xsec)
def read_detector(self, name:str, material:str=None)->Detector:
"""Read the detector configuration from the SNOwGLoBES
Parameters
----------
name:str
Detector name (see :attr:`detectors` for options)
material:str or None
Detector material (see :attr:`materials` for options)
If `None` (default) try to guess material from detector name
Returns
-------
Detector
an object with the detector configuration.
"""
material = material or guess_material(name)
channels = {}
bins = self.binning[material]
bins_t = _bin_edges_from_centers(bins['e_true'])<<u.GeV
bins_s = _bin_edges_from_centers(bins['e_smear'])<<u.GeV
for ch in self.channels[material].itertuples():
try:
smearing = SmearingMatrix(bins_true=bins_t,
bins_smeared=bins_s,
matrix=self.smearings[name][ch.name].T)
except KeyError:
warn(f'Smearing not found for detector={name}, channel={ch.name}. Using unsmeared spectrum')
smearing = None
try:
efficiency= self.efficiencies[name][ch.name]
except KeyError:
warn(f'Efficiency not found for detector={name}, channel={ch.name}. Using 100% efficiency')
efficiency = 1
flavor=_get_flavor_index(ch)
channel = DetectionChannel(
flavor=flavor,
weight=ch.weight*self.detectors[name].factor,
xsec=self.load_xsec(ch.name,flavor),
smearing=smearing,
efficiency=efficiency)
channels[ch.name]=channel
return Detector(name=name,
mass=self.detectors[name].mass<<u.kt,
channels=channels
)
[docs]
def run(self, flux:Container, detector:str, material:str=None, detector_effects:bool = True)->Dict[str, Container]:
"""Run the rate calculation for the given detector.
Parameters
----------
flux: Container
The incoming neutrino flux (or fluence).
detector: str
Name of the detector to calculate the rate.
Check `RateCalculator.detectors` for the list of options
material: str or None
Name of the detector material.
Check `RateCalculator.materials` for the list of options
If `None` (default) it will be guessed based on the name of the detector
detector_effects: bool
If true (default), account for efficiency and smearing. If false, consider a perfect detector.
Returns
-------
dict[str, Container]
A dictionary with interaction rates (as instances of :class:`snewpy.flux.Container`) for each channel.
"""
return self.read_detector(detector,material).run(flux, detector_effects=detector_effects)