import numpy as np
import h5py
import warnings
import json
import dill
import copy
import importlib
from figaro.exceptions import FIGAROException
from figaro.mixture import mixture
from figaro.cosmology import Planck18, Planck15, dVdz_approx_planck18, dVdz_approx_planck15
from pathlib import Path
from tqdm import tqdm
supported_extensions = ['h5', 'hdf5', 'txt', 'dat', 'csv']
supported_waveforms = ['combined', 'imr', 'seob']
injected_pars = ['m1', 'm2', 'z', 's1x', 's1y', 's1z', 's2x', 's2y', 's2z', 'ra', 'dec']
loadable_inj_pars = injected_pars + ['q', 'chi_eff', 'chi_p', 's1', 's2']
mass_parameters = ['m1', 'm2', 'm1_detect', 'm2_detect', 'mc', 'mt', 'q']
spin_parameters = ['s1x', 's1y', 's1z', 's2x', 's2y', 's2z', 's1', 's2', 'chi_eff', 'chi_p']
GW_par = {'m1' : 'mass_1_source',
'm2' : 'mass_2_source',
'm1_detect' : 'mass_1',
'm2_detect' : 'mass_2',
'mc' : 'chirp_mass_source',
'mc_detect' : 'chirp_mass',
'mt' : 'total_mass_source',
'z' : 'redshift',
'q' : 'mass_ratio',
'eta' : 'symmetric_mass_ratio',
'chi_eff' : 'chi_eff',
'chi_p' : 'chi_p',
'ra' : 'ra',
'dec' : 'dec',
'luminosity_distance': 'luminosity_distance',
'cos_theta_jn' : 'cos_theta_jn',
'cos_tilt_1' : 'cos_tilt_1',
'cos_tilt_2' : 'cos_tilt_2',
's1x' : 'spin_1x',
's2x' : 'spin_2x',
's1y' : 'spin_1y',
's2y' : 'spin_2y',
's1z' : 'spin_1z',
's2z' : 'spin_2z',
's1' : 'spin_1',
's2' : 'spin_2',
'psi' : 'psi',
'cos_iota' : 'cos_iota',
'phase' : 'phase',
'tc' : 'geocent_time',
'snr' : 'network_matched_filter_snr',
'far' : 'far',
'log_prior' : 'log_prior',
'log_likelihood' : 'log_likelihood',
}
# LVK 03 injections have a slightly different nomenclature (no underscore)
# See https://zenodo.org/records/7890437
inj_par = copy.deepcopy(GW_par)
inj_par['m1'] = 'mass1_source'
inj_par['m2'] = 'mass2_source'
inj_par['m1_detect'] = 'mass1'
inj_par['m2_detect'] = 'mass2'
inj_par['s1x'] = 'spin1x'
inj_par['s1y'] = 'spin1y'
inj_par['s1z'] = 'spin1z'
inj_par['s2x'] = 'spin2x'
inj_par['s2y'] = 'spin2y'
inj_par['s2z'] = 'spin2z'
inj_par['luminosity_distance'] = 'distance'
supported_pars = [p for p in GW_par.keys() if p not in ['snr', 'far']]
[docs]
def available_gw_pars():
"""
Print a list of available GW parameters
"""
print(supported_pars)
[docs]
def load_single_event(event, seed = False, par = None, n_samples = -1, cosmology = 'Planck18', volume = False, waveform = 'combined', snr_threshold = None, far_threshold = None, likelihood = False):
'''
Loads the data from .txt/.dat files (for simulations) or .h5/.hdf5 files (posteriors from GWTC) for a single event.
Default cosmological parameters from Planck Collaboration (2021) in a flat Universe (https://www.aanda.org/articles/aa/pdf/2020/09/aa33910-18.pdf)
Not all GW parameters are implemented: run figaro.load.available_gw_pars() for a list of the available parameters.
Arguments:
str or Path file: file with samples
bool seed: fixes the seed to a default value (1) for reproducibility
list-of-str par: list with parameter(s) to extract from GW posteriors (m1, m2, mc, z, chi_effective)
int n_samples: number of samples for (random) downsampling. Default -1: all samples
str cosmology: set of cosmological parameters (Planck18 or Planck15)
bool volume: if True, loads RA, dec and Luminosity distance (for skymaps)
str waveform: waveform family to be used ('combined', 'seob', 'imr')
double snr_threhsold: SNR threshold for event filtering. For injection analysis only.
double far_threshold: FAR threshold for event filtering. For injection analysis only.
bool likelihood: resample to get likelihood samples
Returns:
np.ndarray: samples
np.ndarray: name
'''
if seed:
rdstate = np.random.RandomState(seed = 1)
else:
rdstate = np.random.RandomState()
event = Path(event).resolve()
name, ext = str(event).split('/')[-1].split('.')
if volume:
par = ['ra', 'dec', 'luminosity_distance']
if ext not in supported_extensions:
raise TypeError("File {0}.{1} is not supported".format(name, ext))
if ext not in ['h5', 'hdf5']:
if par is not None:
warnings.warn("Par names (or volume keyword) are ignored for .txt/.dat/.csv files")
if n_samples > -1:
samples = np.atleast_1d(np.loadtxt(event))
s = int(min([n_samples, len(samples)]))
out = samples[rdstate.choice(np.arange(len(samples)), size = s, replace = False)]
else:
out = np.loadtxt(event)
else:
# Check that a list of parameters is passed
if par is None:
raise TypeError("Please provide a list of parameter names you want to load (e.g. ['m1']).")
# Check that all the parametes are loadable
if not np.all([p in GW_par.keys() for p in par]):
wrong_pars = [p for p in par if p not in GW_par.keys()]
raise FIGAROException("The following parameters are not implemented: "+", ".join(wrong_pars)+". Run figaro.load.available_gw_pars() for a list of available parameters.")
# If everything is ok, load the samples
else:
out = _unpack_gw_posterior(event, par = par, n_samples = n_samples, cosmology = cosmology, rdstate = rdstate, waveform = waveform, snr_threshold = snr_threshold, far_threshold = far_threshold, likelihood = likelihood)
if out is None:
return out, name
if len(np.shape(out)) == 1:
out = np.atleast_2d(out).T
return out, name
[docs]
def load_data(path, seed = False, par = None, n_samples = -1, cosmology = 'Planck18', volume = False, waveform = 'combined', snr_threshold = None, far_threshold = None, verbose = True, likelihood = False):
'''
Loads the data from .txt files (for simulations) or .h5/.hdf5/.dat files (posteriors from GWTC-x).
Default cosmological parameters from Planck Collaboration (2021) in a flat Universe (https://www.aanda.org/articles/aa/pdf/2020/09/aa33910-18.pdf)
Not all GW parameters are implemented: run figaro.load.available_gw_pars() for a list of available parameters.
Arguments:
str or Path path: folder with data files
bool seed: fixes the seed to a default value (1) for reproducibility
list-of-str par: list with parameter(s) to extract from GW posteriors
int n_samples: number of samples for (random) downsampling. Default -1: all samples
str cosmology: set of cosmological parameters (Planck18 or Planck15)
str waveform: waveform family to be used ('combined', 'seob', 'imr')
double snr_threhsold: SNR threshold for event filtering. For injection analysis only.
double far_threshold: FAR threshold for event filtering. For injection analysis only.
bool verbose: show progress bar
bool likelihood: resample to get likelihood samples
Returns:
np.ndarray: samples
np.ndarray: names
'''
folder = Path(path).resolve()
event_files = [Path(folder,f) for f in folder.glob('[!.]*')] # Ignores hidden files
events = []
names = []
n_events = len(event_files)
removed_snr = False
if volume:
par = ['ra', 'dec', 'luminosity_distance']
if n_events == 0:
raise FIGAROException("Empty folder")
for event in tqdm(event_files, desc = 'Loading events', disable = not(verbose)):
if seed:
rdstate = np.random.RandomState(seed = 1)
else:
rdstate = np.random.RandomState()
name, ext = str(event).split('/')[-1].split('.')
names.append(name)
if ext not in supported_extensions:
raise TypeError("File {0}.{1} is not supported".format(name, ext))
if ext not in ['h5', 'hdf5']:
if par is not None:
warnings.warn("Par names (or volume keyword) are ignored for .txt/.dat/.csv files")
if n_samples > -1:
samples = np.atleast_1d(np.loadtxt(event))
s = int(min([n_samples, len(samples)]))
samples_subset = samples[rdstate.choice(np.arange(len(samples)), size = s, replace = False)]
if len(np.shape(samples_subset)) == 1:
samples_subset = np.atleast_2d(samples_subset).T
events.append(samples_subset)
else:
samples = np.atleast_1d(np.loadtxt(event))
if len(np.shape(samples)) == 1:
samples = np.atleast_2d(samples).T
events.append(samples)
else:
# Check that a list of parameters is passed
if par is None:
raise TypeError("Please provide a list of parameter names you want to load (e.g. ['m1']).")
# Check that all the parametes are loadable
if not np.all([p in GW_par.keys() for p in par]):
wrong_pars = [p for p in par if p not in GW_par.keys()]
raise FIGAROException("The following parameters are not implemented: "+", ".join(wrong_pars)+". Run figaro.load.available_gw_pars() for a list of available parameters.")
# If everything is ok, load the samples
else:
out = _unpack_gw_posterior(event, par = par, n_samples = n_samples, cosmology = cosmology, rdstate = rdstate, waveform = waveform, snr_threshold = snr_threshold, far_threshold = far_threshold, likelihood = likelihood)
if out is not None:
if out.shape[-1] == len(par):
events.append(out)
elif 'snr' in par:
removed_snr = True
names.remove(name)
else:
names.remove(name)
if removed_snr:
warnings.warn("At least one event does not have SNR samples. These events cannot be loaded for this parameter choices.")
return (events, np.array(names))
def _unpack_gw_posterior(event, par, cosmology, rdstate, n_samples = -1, waveform = 'combined', snr_threshold = None, far_threshold = None, likelihood = False):
'''
Reads data from .h5/.hdf5 GW posterior files.
For GWTC-3 data release, it uses by default the Mixed posterior samples.
Not all parameters are implemented: run figaro.load.available_gw_pars() for a list of available parameters.
The waveform argument allows the user to select a waveform family. The default value, 'combined' uses samples from both imr and seob waveforms.
For SEOB waveforms, the following waveform models are used (in descending priority order):
* SEOBNRv4PHM
* SEOBNRv4P
* SEOBNRv4
For IMR waveforms, in descending order:
* IMRPhenomXPHM
* IMRPhenomPv2
* IMRPhenomPv3HM
Arguments:
str event: file to read
list-of-str par: parameter to extract
str cosmology: set of cosmological parameters to use.
np.random.rdstate rdstate: state for random number generation
int n_samples: number of samples for (random) downsampling. Default -1: all samples
str waveform: waveform family to be used ('combined', 'imr', 'seob')
double snr_threhsold: SNR threshold for event filtering. For injection analysis only.
double far_threshold: FAR threshold for event filtering. For injection analysis only.
bool likelihood: resample to get likelihood samples
Returns:
np.ndarray: samples
'''
if cosmology == 'Planck18':
omega = Planck18
elif cosmology == 'Planck15':
omega = Planck15
else:
raise FIGAROException("Cosmology not supported")
if waveform not in supported_waveforms:
raise FIGAROException("Unknown waveform: please use 'combined' (default), 'imr' or 'seob'")
if far_threshold is not None and snr_threshold is not None:
warnings.warn("Both FAR and SNR threshold provided. FAR will be used.")
snr_threshold = None
if far_threshold is not None:
if 'far' not in par:
par = np.append(par, 'far')
elif snr_threshold is not None:
if 'snr' not in par:
par = np.append(par, 'snr')
with h5py.File(Path(event), 'r') as f:
samples = []
loaded_pars = []
flag_filter = False
try:
try:
# LVK injections
try:
data = f['MDC']['posterior_samples']
# Playground
except:
data = f['posterior_samples']
MDC_flag = True
# GWTC-2, GWTC-3
except:
MDC_flag = False
if waveform == 'combined':
# GWTC-2
try:
data = f['PublicationSamples']['posterior_samples']
# GWTC-3
except KeyError:
try:
data = f['C01:Mixed']['posterior_samples']
except:
try:
data = f['IMRPhenomXPHM']['posterior_samples']
except:
data = f['SEOBNRv4PHM']['posterior_samples']
else:
if waveform == 'imr':
try:
try:
data = f['C01:IMRPhenomXPHM']['posterior_samples']
except:
data = f['IMRPhenomXPHM']['posterior_samples']
except:
try:
try:
data = f['C01:IMRPhenomPv2']['posterior_samples']
except:
data = f['IMRPhenomPv2']['posterior_samples']
except:
try:
try:
data = f['C01:IMRPhenomPv3HM']['posterior_samples']
except:
data = f['IMRPhenomPv3HM']['posterior_samples']
except:
try:
try:
data = f['C01:IMRPhenomXPHM:LowSpin']['posterior_samples']
except:
data = f['IMRPhenomXPHM:LowSpin']['posterior_samples']
except:
try:
data = f['C01:IMRPhenomPv2_NRTidal-LS']['posterior_samples']
except:
data = f['IMRPhenomPv2_NRTidal-LS']['posterior_samples']
if waveform == 'seob':
try:
try:
data = f['C01:SEOBNRv4PHM']['posterior_samples']
except:
data = f['SEOBNRv4PHM']['posterior_samples']
except:
try:
try:
data = f['C01:SEOBNRv4P']['posterior_samples']
except:
data = f['SEOBNRv4P']['posterior_samples']
except:
try:
try:
data = f['C01:SEOBNRv4']['posterior_samples']
except:
data = f['SEOBNRv4']['posterior_samples']
except:
try:
data = f['C01:SEOBNRv4T_surrogate_LS']['posterior_samples']
except:
data = f['SEOBNRv4T_surrogate_LS']['posterior_samples']
for name, lab in zip(GW_par.keys(), GW_par.values()):
if name in par:
if name == 'snr':
try:
if MDC_flag or waveform != 'combined':
snr = np.array(data[lab])
samples.append(data[lab])
if snr_threshold is not None:
flag_filter = True
except:
warnings.warn("SNR filter is not available with this dataset.")
elif name == 'far' and far_threshold is not None:
try:
flag_filter = True
far = np.array(data[lab])
except:
warnings.warn("FAR filter is not available with this dataset.")
elif name == 's1':
try:
samples.append(data[lab])
except:
samples.append(np.sqrt(data['spin_1x']**2+data['spin_1y']**2+data['spin_1z']**2))
elif name == 's2':
try:
samples.append(data[lab])
except:
samples.append(np.sqrt(data['spin_2x']**2+data['spin_2y']**2+data['spin_2z']**2))
elif name == 'luminosity_distance':
try:
samples.append(data[lab])
except:
samples.append(np.exp(data['logdistance']))
else:
samples.append(data[lab])
loaded_pars.append(name)
if len(par) == 1:
samples = np.atleast_2d(samples).T
else:
par = np.array(par)
loaded_pars = np.array(loaded_pars)
samples_loaded = np.array(samples)
samples = []
for pi in par:
if not (pi == 'far' or (pi == 'snr' and flag_filter)):
samples.append(samples_loaded[np.where(loaded_pars == pi)[0]].flatten())
samples = np.array(samples)
if flag_filter:
if far_threshold is not None:
samples = samples[:, np.where((far < far_threshold) & (far > 0))[0]]
if snr_threshold is not None:
samples = samples[:, np.where(snr > snr_threshold)[0]]
samples = samples.T
if likelihood:
inv_prior = 1./_prior_gw(par, data, cosmology)
h = np.random.uniform(0,1, len(samples))
samples = samples[h < inv_prior/np.max(inv_prior)]
if n_samples > -1:
s = int(min([n_samples, len(samples)]))
return samples[rdstate.choice(np.arange(len(samples)), size = s, replace = False)]
else:
return samples
# GWTC-1
except KeyError:
if waveform == 'combined':
label = 'Overall_posterior'
elif waveform == 'imr':
label = 'IMRPhenomPv2_posterior'
elif waveform == 'seob':
label = 'SEOBNRv3_posterior'
try:
data = f[label]
except KeyError:
try:
# GW170817
data = f['IMRPhenomPv2NRT_lowSpin_posterior']
except:
print("Skipped event {0} (not loadable yet)".format(Path(event).parts[-1].split('.')[0]))
return None
# Provided quantities
names_GWTC1 = {'m1_detect' : 'm1_detector_frame_Msun',
'm2_detect' : 'm2_detector_frame_Msun',
'ra' : 'right_ascension',
'dec' : 'declination',
'luminosity_distance': 'luminosity_distance_Mpc',
'cos_theta_jn' : 'costheta_jn',
's1' : 'spin1',
's2' : 'spin2',
'cos_tilt_1' : 'costilt1',
'cos_tilt_2' : 'costilt2',
}
ss = {name: data[lab] for name, lab in zip(names_GWTC1.keys(), names_GWTC1.values())}
# Derived quantities
ss['z'] = omega.Redshift(ss['luminosity_distance'])
ss['m1'] = ss['m1_detect']/(1+ss['z'])
ss['m2'] = ss['m2_detect']/(1+ss['z'])
ss['mc'] = (ss['m1']*ss['m2'])**(3./5.)/(ss['m1']+ss['m2'])**(1./5.)
ss['mt'] = ss['m1']+ss['m2']
ss['q'] = ss['m2']/ss['m1']
ss['eta'] = ss['m1']*ss['m2']/(ss['m1']+ss['m2'])**2
ss['s1z'] = ss['s1']*ss['cos_tilt_1']
ss['s2z'] = ss['s2']*ss['cos_tilt_2']
ss['chi_eff'] = (ss['s1z'] + ss['q']*ss['s2z'])/(1+ss['q'])
for name in GW_par.keys():
if name in par:
if not (name == 'snr' or name == 'far'):
samples.append(ss[name])
loaded_pars.append(name)
if len(par) == 1:
samples = np.atleast_2d(samples).T
else:
par = np.array(par)
loaded_pars = np.array(loaded_pars)
samples_loaded = np.array(samples)
samples = []
for pi in par:
if not (pi == 'snr' or pi == 'far'):
samples.append(samples_loaded[np.where(loaded_pars == pi)[0]].flatten())
samples = np.array(samples)
samples = samples.T
if likelihood:
inv_prior = 1./_prior_gw(par, ss, cosmology)
h = np.random.uniform(0,1, len(samples))
samples = samples[h < inv_prior/np.max(inv_prior)]
if n_samples > -1:
s = int(min([n_samples, len(samples)]))
return samples[rdstate.choice(np.arange(len(samples)), size = s, replace = False)]
else:
return samples
def _prior_gw(par, samples, cosmology = 'Planck18'):
"""
GW prior parameters, following https://docs.ligo.org/RatesAndPopulations/gwpopulation_pipe/_modules/gwpopulation_pipe/data_collection.html#evaluate_prior
Arguments:
np.ndarray samples: posterior samples
list-of-str par: parameter(s)
str cosmology: cosmological parameters.
Returns:
np.ndarray: prior values
"""
if cosmology == 'Planck18':
omega = Planck18
dVdz = dVdz_approx_planck18
elif cosmology == 'Planck15':
omega = Planck15
dVdz = dVdz_approx_planck15
else:
raise FIGAROException("Cosmology not supported")
vol = omega.ComovingVolume(2.5)/1e9
prior = np.ones(len(samples))
# Redshift prior (uniform in comoving source frame)
if 'z' in par:
prior *= dVdz(samples[GW_par['z']])/((1.+samples[GW_par['z']])*vol)
# Mass prior (uniform in detector-frame component masses)
n_mass_pars = np.sum([item in par for item in ['m1','m2','mc','q']])
if n_mass_pars > 0:
prior *= (1+samples[GW_par['z']])**np.min([n_mass_pars, 2])
if 'q' in par:
prior *= samples[GW_par['m1']]
if ('mc' in par or 'mc_detect' in par):
prior *= (1 + samples[GW_par['q']]) ** 0.2 / samples[GW_par['q']] ** 0.6
return prior
[docs]
def save_density(draws, folder = '.', name = 'density', ext = 'json'):
"""
Exports a list of figaro.mixture instances to file
Arguments:
list draws: list of mixtures to be saved
str or Path folder: folder in which the output file will be saved
str name: name to be given to output file
str ext: file extension (pkl or json)
"""
if ext == 'pkl':
with open(Path(folder, name+'.pkl'), 'wb') as f:
dill.dump(draws, f)
elif ext == 'json':
if len(np.shape(draws)) == 1:
draws = np.atleast_2d(draws)
ll = []
for draws_i in draws:
list_of_dicts = [dr.__dict__.copy() for dr in draws_i]
for density in list_of_dicts:
if 'components' in density.keys():
density.pop('components')
for key in density.keys():
value = density[key]
if isinstance(value, np.ndarray):
value = value.tolist()
else:
value = float(value)
density[key] = value
ll.append(list_of_dicts)
s = json.dumps(ll)
with open(Path(folder, name + '.json'), 'w') as f:
json.dump(s, f)
else:
raise FIGAROException("Extension {0} is not supported. Valid extensions are pkl or json.")
[docs]
def load_density(path, make_comp = True):
"""
Loads a list of figaro.mixture instances from path.
If the requested file extension (pkl or json) is not available, it tries loading the other.
Arguments:
str or Path path: path with draws (file or folder)
bool make_comp: make component objects
Returns:
list: figaro.mixture object instances
"""
path = Path(path)
if path.is_file():
return _load_density_file(path, make_comp)
else:
files = [_load_density_file(file, make_comp) for file in path.glob('*.[jp][sk][ol]*') if not file.stem == 'posteriors_single_event']
if len(files) > 0:
return files
else:
raise FIGAROException("Density file(s) not found")
def _load_density_file(file, make_comp = True):
"""
Loads a list of figaro.mixture instances from file.
If the requested file extension (pkl or json) is not available, it tries loading the other.
Arguments:
str or Path file: file with draws
bool make_comp: make component objects
Returns
list: figaro.mixture object instances
"""
file = Path(file)
ext = file.suffix
if ext == '.pkl':
try:
return _load_pkl(file)
except FileNotFoundError:
try:
return _load_json(file.with_suffix('.json'), make_comp)
except FileNotFoundError:
raise FIGAROException("{0} not found. Please provide it or re-run the inference.".format(file.name))
elif ext == '.json':
try:
return _load_json(file, make_comp)
except FileNotFoundError:
try:
return _load_pkl(file.with_suffix('.pkl'))
except FileNotFoundError:
raise FIGAROException("{0} not found. Please provide it or re-run the inference.".format(file.name))
else:
raise FIGAROException("Extension {0} is not supported. Please provide .pkl or .json file.".format(file.suffix))
def _load_pkl(file):
"""
Loads a list of figaro.mixture instances from pkl file
Arguments:
str or Path file: file with draws
Returns
list: figaro.mixture object instances
"""
with open(file, 'rb') as f:
draws = dill.load(f)
return draws
def _load_json(file, make_comp = True):
"""
Loads a list of figaro.mixture instances from json file
Arguments:
str or Path file: file with draws
bool make_comp: make component objects
Returns
list: figaro.mixture object instances
"""
with open(Path(file), 'r') as fjson:
dictjson = json.loads(json.load(fjson))
ll = []
for list_of_dict in dictjson:
draws = []
for dict_ in list_of_dict:
dict_.pop('log_w')
for key in dict_.keys():
value = dict_[key]
if isinstance(value, list):
dict_[key] = np.array(value)
if key == 'probit':
dict_[key] = bool(value)
draws.append(mixture(**dict_, make_comp = make_comp))
ll.append(draws)
if len(ll) == 1:
return ll[0]
return ll
[docs]
def load_selection_function(file, par = None, far_threshold = 1):
"""
Loads the selection function, either from a python module containing a method called 'selection_function' or via injections.
If injections, it assumes that the last column of a txt/csv/dat file contains the sampling pdf.
Arguments:
str or Path file: selection function file
list-of-str par: list with parameter(s) to extract from GW posteriors
double threshold: FAR threshold to filter LVK injections
Returns:
np.ndarray or callable: detected samples or callable with approximant
np.ndarray or NoneType: injection pdf (for samples) or None (for approximant)
int or NoneType: total number of injections
double duration: duration of the observation
"""
file = Path(file)
ext = file.parts[-1].split('.')[1]
if ext not in supported_extensions + ['py']:
raise FIGAROException("Selection function file not supported")
if ext == 'py':
selfunc_file_name = file.parts[-1].split('.')[0]
spec = importlib.util.spec_from_file_location(selfunc_file_name, file)
selfunc_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(selfunc_module)
selfunc = selfunc_module.selection_function
inj_pdf = None
n_total_inj = None
try:
duration = selfunc_module.duration
except:
duration = 1.
else:
if ext not in ['h5','hdf5']:
samples = np.loadtxt(file)
det_idx = samples[:,-1]
selfunc = samples[:,:-2][det_idx == 1]
inj_pdf = samples[:,-2][det_idx == 1]
n_total_inj = len(samples)
duration = 1.
else:
selfunc, inj_pdf, n_total_inj, duration = _unpack_injections(file, par, far_threshold)
return selfunc, inj_pdf, n_total_inj, duration
def _unpack_injections(file, par, far_threshold = 1.):
"""
Reads data from .h5/.hdf5 injection file (https://zenodo.org/records/7890437).
A sample is considered detected if at least one of the searches calls a detection.
Arguments:
str event: file to read
str par: parameter to extract
double far_threshold: FAR threshold for injection filtering
Returns:
np.ndarray: samples
np.ndarray: injection pdf
int or NoneType: total number of injections
double duration: duration of the observation
"""
# Check that a list of parameters is passed
if par is None:
raise TypeError("Please provide a list of parameter names you want to load (e.g. ['m1']).")
# Check that all the parametes are loadable
if not np.all([p in loadable_inj_pars for p in par]):
wrong_pars = [p for p in par if p not in loadable_inj_pars]
raise FIGAROException("The following parameters are not implemented: "+", ".join(wrong_pars))
with h5py.File(file, 'r') as f:
data = f['injections']
n_total_inj = int(data.attrs['total_generated'])
duration = data.attrs['analysis_time_s']/(60.*60.*24.*365) # Years
# Detected injections
far_cwb = np.array(data['far_cwb'])
far_gstlal = np.array(data['far_gstlal'])
far_mbta = np.array(data['far_mbta'])
try:
far_pycbc = np.array(data['far_pycbc_bbh'])
except:
far_pycbc = np.array(data['far_pycbc_hyperbank'])
idx = np.where((far_cwb < far_threshold) | (far_gstlal < far_threshold) | (far_mbta < far_threshold) | (far_pycbc < far_threshold))[0]
# Load samples
samples = np.zeros((len(par), len(idx)))
inj_pdf = np.ones(len(idx))
# Parameters
m1 = np.array(data[inj_par['m1']])[idx]
m2 = np.array(data[inj_par['m2']])[idx]
q = m2/m1
s1x = np.array(data[inj_par['s1x']])[idx]
s1y = np.array(data[inj_par['s1y']])[idx]
s1z = np.array(data[inj_par['s1z']])[idx]
s2x = np.array(data[inj_par['s2x']])[idx]
s2y = np.array(data[inj_par['s2y']])[idx]
s2z = np.array(data[inj_par['s2z']])[idx]
for i, lab in enumerate(par):
# Already available parameters:
if lab in injected_pars:
if not lab == 'cos_theta_jn':
samples[i] = np.array(data[inj_par[lab]])[idx]
else:
samples[i] = np.cos(np.array(data[inj_par[lab]])[idx])
else:
# Masses
if lab == 'q':
samples[i] = q
if lab == 'mt':
samples[i] = m1 + m2
if lab == 'mc':
samples[i] = m1*m2**(3./5.)/(m1+m2)**(1./5.)
# Spins
if lab == 's1':
samples[i] = np.sqrt(s1x**2+s1y**2+s1z**2)
if lab == 's2':
samples[i] = np.sqrt(s2x**2+s2y**2+s2z**2)
if lab == 'chi_eff':
samples[i] = (s1z + q*s2z)/(1+q)
if lab == 'chi_p':
samples[i] = np.maximum(np.sqrt(s1x**2+s1y**2), np.sqrt(s2x**2+s2y**2)*q*(4*q+3)/(4+3*q))
# Sampling pdf
n_mass_pars = len([lab for lab in par if lab in mass_parameters])
n_spin_pars = len([lab for lab in par if lab in spin_parameters])
# Masses
if n_mass_pars == 1:
inj_pdf *= np.array(data['mass1_source_sampling_pdf'])[idx]
elif n_mass_pars == 2:
inj_pdf *= np.array(data['mass1_source_mass2_source_sampling_pdf'])[idx]
if 'q' in par:
inj_pdf *= m1
# Spins
if n_spin_pars > 0:
inj_pdf *= (np.array(data['spin1x_spin1y_spin1z_sampling_pdf'])[idx]*np.array(data['spin2x_spin2y_spin2z_sampling_pdf'])[idx])**(n_spin_pars/6.)
if 's1' in par or 'chi_eff' in par or 'chi_p' in par:
inj_pdf /= 2*np.pi*(s1x**2+s1y**2+s1z**2)
if 's2' in par or 'chi_eff' in par or 'chi_p' in par:
inj_pdf /= 2*np.pi*(s2x**2+s2y**2+s2z**2)
# Distance
if 'z' in par:
inj_pdf *= np.array(data['redshift_sampling_pdf'])[idx]
# Sky position
if 'ra' in par:
inj_pdf *= np.array(data['right_ascension_sampling_pdf'])[idx]
if 'dec' in par:
inj_pdf *= np.array(data['declination_sampling_pdf'])[idx]
return samples.T, inj_pdf, n_total_inj, duration