import numpy as np
import sys
import dill
from collections import Counter
from pathlib import Path
from scipy.special import gammaln, logsumexp
from scipy.stats import multivariate_normal as mn
from scipy.stats import invwishart, norm, invgamma, dirichlet, gamma
from figaro.decorators import *
from figaro.transform import *
from figaro._numba_functions import *
from figaro._numba_functions import _mvn_logpdf
from figaro._likelihood import evaluate_mixture_MC_draws, evaluate_mixture_MC_draws_1d, log_norm
from figaro.exceptions import except_hook, FIGAROException
from figaro.utils import get_priors
from figaro.marginal import _condition, _marginalise
from numba import njit, prange
sys.excepthook = except_hook
#-----------#
# Functions #
#-----------#
@njit
def _student_t(df, t, mu, sigma, dim):
"""
Multivariate student-t pdf.
As in http://gregorygundersen.com/blog/2020/01/20/multivariate-t/
Arguments:
float df: degrees of freedom
float t: variable (2d array)
np.ndarray mu: mean (2d array)
np.ndarray sigma: variance
int dim: number of dimensions
Returns:
float: student_t(df).logpdf(t)
"""
vals, vecs = eigh_jit(sigma)
logdet = np.log(vals).sum()
U = vecs * np.sqrt(1./vals)
dev = t - mu
maha = np.sum(dot_jit(dev, U)**2, axis=-1)
x = 0.5 * (df + dim)
A = gammaln_jit(x)
B = gammaln_jit(0.5 * df)
C = dim/2. * np.log(df * np.pi)
D = 0.5 * logdet
E = -x * np.log1p((1./df) * maha)
return (A - B - C - D + E)[0]
@njit
def _update_alpha(alpha, n, K, alpha0, burnin = 1000):
"""
Update concentration parameter using a Metropolis-Hastings sampling scheme.
Arguments:
double alpha: Initial value for concentration parameter
int n: Number of samples
int K: Number of active clusters
int burnin: MH burnin
Returns:
double: new concentration parameter value
"""
a_old = alpha
n_draws = burnin+np.random.randint(100)
for i in prange(n_draws):
a_new = a_old + (np.random.random() - 0.5)*0.5
if a_new > 0.:
logP_old = gammaln_jit(a_old) - gammaln_jit(a_old + n) + (K+alpha0) * np.log(a_old) - a_old
logP_new = gammaln_jit(a_new) - gammaln_jit(a_new + n) + (K+alpha0) * np.log(a_new) - a_new
if logP_new - logP_old > np.log(np.random.random()):
a_old = a_new
return a_old
@njit
def _compute_t_pars(k, mu, nu, L, mean, S, N, dim):
"""
Compute parameters for student-t distribution.
Arguments:
double k: Normal std parameter (for NIW)
np.ndarray mu: Normal mean parameter (for NIW)
int nu: Inverse-Wishart df parameter (for NIW)
np.ndarray L: Inverse-Wishart scale matrix (for NIW)
np.ndarray mean: samples mean
np.ndarray S: samples covariance
int N: number of samples
int dim: number of dimensions
Returns:
int: degrees of fredom for student-t
np.ndarray: scale matrix for student-t
np.ndarray: mean for student-t
"""
# Update hyperparameters
k_n, mu_n, nu_n, L_n = _compute_hyperpars(k, mu, nu, L, mean, S, N)
# Update t-parameters
t_df = nu_n - dim + 1
t_shape = rescale_matrix(L_n, 1./((k_n+1.)/(k_n*t_df)))
return t_df, t_shape, mu_n
@njit
def _compute_hyperpars(k, mu, nu, L, mean, S, N):
"""
Update hyperparameters for Normal Inverse Gamma/Wishart (NIG/NIW).
See https://www.cs.ubc.ca/~murphyk/Papers/bayesGauss.pdf
Arguments:
double k: Normal std parameter (for NIG/NIW)
np.ndarray mu: Normal mean parameter (for NIG/NIW)
int nu: Gamma df parameter (for NIG/NIW)
np.ndarray L: Gamma scale matrix (for NIG/NIW)
np.ndarray mean: samples mean
np.ndarray S: samples covariance
int N: number of samples
Returns:
double: updated Normal std parameter (for NIG/NIW)
np.ndarray: updated Normal mean parameter (for NIG/NIW)
int: updated Gamma df parameter (for NIG/NIW)
np.ndarray: updated Gamma scale matrix (for NIG/NIW)
"""
k_n = k + N
mu_n = (mu*k + N*mean)/k_n
nu_n = nu + N
if N > 0:
L_n = L + S + rescale_matrix(outer_jit((mean - mu).T, (mean - mu)), k_n/(k*N))
else:
L_n = L + S
return k_n, mu_n, nu_n, L_n
@njit
def _compute_component_suffstats_add(x, mean, S, N, p_mu, p_k, p_nu, p_L):
"""
Update mean, covariance, number of samples and maximum a posteriori for mean and covariance.
Arguments:
np.ndarray x: sample to add
np.ndarray mean: mean of samples already in the cluster
np.ndarray cov: covariance of samples already in the cluster
int N: number of samples already in the cluster
np.ndarray p_mu: NIG Normal mean parameter
double p_k: NIG Normal std parameter
int p_nu: NIG Gamma df parameter
np.ndarray p_L: NIG Gamma scale matrix
Returns:
np.ndarray: updated mean
np.ndarray: updated covariance
int: updated number of samples
np.ndarray: mean (maximum a posteriori)
np.ndarray: covariance (maximum a posteriori)
"""
new_mean = (mean*N+x)/(N+1)
new_S = (S + rescale_matrix(outer_jit(mean,mean), 1./N) + outer_jit(x,x)) - rescale_matrix(outer_jit(new_mean, new_mean), 1./(N+1))
new_N = N+1
new_mu = ((p_mu*p_k + new_N*new_mean)/(p_k + new_N))[0]
new_sigma = rescale_matrix(p_L + new_S + rescale_matrix(outer_jit((new_mean - p_mu).T, (new_mean - p_mu)), (p_k + new_N)/(p_k*new_N)), (p_nu + new_N - x.shape[-1] - 1))
return new_mean, new_S, new_N, new_mu, new_sigma
@njit
def _compute_component_suffstats_remove(x, mean, S, N, p_mu, p_k, p_nu, p_L):
"""
Update mean, covariance, number of samples and maximum a posteriori for mean and covariance.
Arguments:
np.ndarray x: sample to remove
np.ndarray mean: mean of samples already in the cluster
np.ndarray cov: covariance of samples already in the cluster
int N: number of samples already in the cluster
np.ndarray p_mu: NIG Normal mean parameter
double p_k: NIG Normal std parameter
int p_nu: NIG Gamma df parameter
np.ndarray p_L: NIG Gamma scale matrix
Returns:
np.ndarray: updated mean
np.ndarray: updated covariance
int: updated number of samples
np.ndarray: mean (maximum a posteriori)
np.ndarray: covariance (maximum a posteriori)
"""
if N > 1:
new_mean = (mean*N-x)/(N-1)
if N > 2:
new_S = (S + rescale_matrix(outer_jit(mean,mean), 1./N) - outer_jit(x,x)) - rescale_matrix(outer_jit(new_mean, new_mean), 1./(N-1))
else:
new_S = np.zeros(S.shape)
else:
new_mean = np.zeros(mean.shape)
new_S = np.zeros(S.shape)
new_N = N-1
new_mu = ((p_mu*p_k + new_N*new_mean)/(p_k + new_N))[0]
if N > 2:
new_sigma = rescale_matrix(p_L + new_S + rescale_matrix(outer_jit((new_mean - p_mu).T, (new_mean - p_mu)), (p_k + new_N)/(p_k*new_N)), (p_nu + new_N - x.shape[-1] - 1))
else:
new_sigma = rescale_matrix(p_L, (p_nu + new_N - x.shape[-1] - 1))
return new_mean, new_S, new_N, new_mu, new_sigma
#-------------------#
# Auxiliary classes #
#-------------------#
class _prior:
"""
Class to store the NIW prior parameters
See https://www.cs.ubc.ca/~murphyk/Papers/bayesGauss.pdf, sec. 9
Arguments:
double k: Normal std parameter
np.ndarray L: Wishart scale matrix
int nu: Wishart df parameter
np.ndarray mu: Normal mean parameter
Returns:
prior: instance of prior class
"""
def __init__(self, k, L, nu, mu):
self.k = k
self.nu = np.max([nu, mu.shape[-1]+2])
self.L = rescale_matrix(L, 1./(self.nu-mu.shape[-1]-1))
self.mu = mu
class _component:
"""
Class to store the relevant informations for each component in the mixture.
Arguments:
np.ndarray x: sample added to the new component
prior prior: instance of the prior class with NIG/NIW prior parameters
Returns:
component: instance of component class
"""
def __init__(self, x, prior, N = 1):
self.N = N
self.mean = x
self.S = np.identity(x.shape[-1])*0.
self.mu = np.atleast_2d((prior.mu*prior.k + self.N*self.mean)/(prior.k + self.N)).astype(np.float64)[0]
self.sigma = rescale_matrix(prior.L, (prior.nu - x.shape[-1] - 1))
class _component_h:
"""
Class to store the relevant informations for each component in the mixture.
To be used in hierarchical inference.
Arguments:
np.ndarray x: event added to the new component
int dim: number of dimensions
prior prior: instance of the prior class with NIG/NIW prior parameters
double logL_D: logLikelihood denominator
Returns:
component_h: instance of component_h class
"""
def __init__(self, x, dim, prior, logL_D, mu_MC, sigma_MC, log_alpha_factor):
self.dim = dim
self.N = 1
self.logL_D = logL_D
log_norm_D = logsumexp_jit(logL_D)
idx = np.random.choice(len(mu_MC), p = np.exp(logL_D - log_norm_D))
self.mu = np.copy(mu_MC[idx])
self.sigma = np.copy(sigma_MC[idx])
if dim == 1:
self.mu = np.atleast_2d(self.mu).T
self.sigma = np.atleast_2d(self.sigma).T
self.log_N_true = np.log(self.N) - log_alpha_factor[idx]
[docs]
class density:
"""
Class to initialise a common set of methods for mixture models. Not to be used.
"""
def __init__(self):
pass
def __call__(self, x):
return self.pdf(x)
[docs]
def pdf(self, x):
if self.n_cl == 0:
raise FIGAROException("You are trying to evaluate an empty mixture.\n If you are using the density_from_samples() method, you may want to evaluate the output of that method.")
if len(np.shape(x)) < 2:
if self.dim == 1:
x = np.atleast_2d(x).T
else:
x = np.atleast_2d(x)
with np.errstate(divide = 'ignore', invalid = 'ignore'):
p = np.nan_to_num(self._pdf(x), nan = 0.)
return p
[docs]
def logpdf(self, x):
if self.n_cl == 0:
raise FIGAROException("You are trying to evaluate an empty mixture.\n If you are using the density_from_samples() method, you may want to evaluate the output of that method.")
if len(np.shape(x)) < 2:
if self.dim == 1:
x = np.atleast_2d(x).T
else:
x = np.atleast_2d(x)
with np.errstate(divide = 'ignore', invalid = 'ignore'):
logp = np.nan_to_num(self._logpdf(x), nan = -np.inf)
return logp
@probit
def _pdf(self, x):
"""
Evaluate mixture at point(s) x
Arguments:
np.ndarray x: point(s) to evaluate the mixture at
Returns:
np.ndarray: mixture.pdf(x)
"""
return self._pdf_probit(x) * np.exp(-probit_logJ(x, self.bounds, self.probit))
@probit
def _logpdf(self, x):
"""
Evaluate log mixture at point(s) x
Arguments:
np.ndarray x: point(s) to evaluate the mixture at
Returns:
np.ndarray: mixture.logpdf(x)
"""
return self._logpdf_probit(x) - probit_logJ(x, self.bounds, self.probit)
[docs]
def fast_pdf(self, x):
"""
Fast pdf evaluation using FIGARO implementation of log_norm (JIT) rather than Numpy's.
WARNING: it is meant to be used with MCMC samplers, therefore accepts only one point at a time.
Arguments:
np.ndarray x: point to evaluate the mixture at
Returns:
np.ndarray: mixture.pdf(x)
"""
x = np.atleast_1d(x)
if x.shape == (1,self.dim):
return self._fast_pdf(x[0])
elif x.shape == (self.dim,):
return self._fast_pdf(x)
else:
raise FIGAROException("Please provide one point at a time.")
[docs]
def fast_logpdf(self, x):
"""
Fast logpdf evaluation using FIGARO implementation of log_norm (JIT) rather than Numpy's.
WARNING: it is meant to be used with MCMC samplers, therefore accepts only one point at a time.
Arguments:
np.ndarray x: point to evaluate the mixture at
Returns:
np.ndarray: mixture.pdf(x)
"""
x = np.atleast_1d(x)
if x.shape == (1,self.dim):
return self._fast_logpdf(x[0])
elif x.shape == (self.dim,):
return self._fast_logpdf(x)
else:
raise FIGAROException("Please provide one point at a time.")
@probit
def _fast_pdf(self, x):
"""
Evaluate mixture at point x
Arguments:
np.ndarray x: point to evaluate the mixture at
Returns:
np.ndarray: mixture.pdf(x)
"""
return self._fast_pdf_probit(x) * np.exp(-probit_logJ(x, self.bounds, self.probit))
@probit
def _fast_logpdf(self, x):
"""
Evaluate log mixture at point x
Arguments:
np.ndarray x: point to evaluate the mixture at
Returns:
np.ndarray: mixture.logpdf(x)
"""
return self._fast_logpdf_probit(x) - probit_logJ(x, self.bounds, self.probit)
def _fast_pdf_probit(self, x):
"""
Evaluate mixture at point x in probit space
Arguments:
np.ndarray x: point to evaluate the mixture at (in probit space)
Returns:
np.ndarray: mixture.pdf(x)
"""
return np.sum(self.w*_mvn_pdf(x, self.means, self.covs, self.inv_covs, self.det_covs))
def _fast_logpdf_probit(self, x):
"""
Evaluate log mixture at point x in probit space
Arguments:
np.ndarray x: point to evaluate the mixture at (in probit space)
Returns:
np.ndarray: mixture.logpdf(x)
"""
return logsumexp(self.log_w+_mvn_logpdf(x, self.means, self.covs, self.inv_covs, self.det_covs))
@probit
def _pdf_no_jacobian(self, x):
"""
Evaluate mixture at point(s) x without jacobian
Arguments:
np.ndarray x: point(s) to evaluate the mixture at
Returns:
np.ndarray: mixture.pdf(x)
"""
return self._pdf_probit(x)
def _pdf_probit(self, x):
"""
Evaluate mixture at point(s) x in probit space
Arguments:
np.ndarray x: point(s) to evaluate the mixture at (in probit space)
Returns:
np.ndarray: mixture.pdf(x)
"""
return np.sum(np.array([w*mn(mean, cov, allow_singular = True).pdf(x) for mean, cov, w in zip(self.means, self.covs, self.w)]), axis = 0)
@probit
def _pdf_array(self, x):
"""
Evaluate every mixture component at point(s) x.
Arguments:
np.ndarray x: point(s) to evaluate the components at
Returns:
np.ndarray: component.pdf(x) for each mixture component
"""
return _pdf_array_probit(x) * np.exp(-probit_logJ(x, self.bounds, self.probit))
def _pdf_array_probit(self, x):
"""
Evaluate every mixture component at point(s) x.
Arguments:
np.ndarray x: point(s) to evaluate the components at (in probit space)
Returns:
np.ndarray: component.pdf(x) for each mixture component
"""
return np.array([w*mn(mean, cov, allow_singular = True).pdf(x) for mean, cov, w in zip(self.means, self.covs, self.w)])
@probit
def _fast_pdf_array(self, x):
"""
Evaluate every mixture component at point(s) x.
Arguments:
np.ndarray x: point(s) to evaluate the components at
Returns:
np.ndarray: component.pdf(x) for each mixture component
"""
return _fast_pdf_array_probit(x) * np.exp(-probit_logJ(x, self.bounds, self.probit))
def _fast_pdf_array_probit(self, x):
"""
Evaluate every mixture component at point(s) x.
Arguments:
np.ndarray x: point(s) to evaluate the components at (in probit space)
Returns:
np.ndarray: component.pdf(x) for each mixture component
"""
return self.w*_mvn_pdf(x, self.means, self.covs, self.inv_covs, self.det_covs)
@probit
def _logpdf_no_jacobian(self, x):
"""
Evaluate log mixture at point(s) x without jacobian
Arguments:
np.ndarray x: point(s) to evaluate the mixture at
Returns:
np.ndarray: mixture.logpdf(x)
"""
return self._logpdf_probit(x)
def _logpdf_probit(self, x):
"""
Evaluate log mixture at point(s) x in probit space
Arguments:
np.ndarray x: point(s) to evaluate the mixture at (in probit space)
Returns:
np.ndarray: mixture.logpdf(x)
"""
return logsumexp(np.array([w + mn(mean, cov, allow_singular = True).logpdf(x) for mean, cov, w in zip(self.means, self.covs, self.log_w)]), axis = 0)
[docs]
def cdf(self, x):
if self.dim > 1:
raise FIGAROException("cdf is provided only for 1-dimensional distributions")
if len(np.shape(x)) < 2:
x = np.atleast_2d(x).T
return self._cdf(x)
[docs]
def logcdf(self, x):
if self.dim > 1:
raise FIGAROException("cdf is provided only for 1-dimensional distributions")
if len(np.shape(x)) < 2:
x = np.atleast_2d(x).T
return self._logcdf(x)
@probit
def _cdf(self, x):
"""
Evaluate mixture cdf at point(s) x
Arguments:
np.ndarray x: point(s) to evaluate the mixture at
Returns:
np.ndarray: mixture.cdf(x)
"""
return np.sum(np.array([w*norm(mean[0], cov[0,0]).cdf(x) for mean, cov, w in zip(self.means, np.sqrt(self.covs), self.w)]), axis = 0).flatten()
@probit
def _logcdf(self, x):
"""
Evaluate mixture log cdf at point(s) x
Arguments:
np.ndarray x: point(s) to evaluate the mixture at
Returns:
np.ndarray: mixture.logcdf(x)
"""
return logsumexp(np.array([w + norm(mean[0], cov[0,0]).logcdf(x) for mean, cov, w in zip(self.means, np.sqrt(self.covs), self.log_w)]), axis = 0).flatten()
@from_probit
def rvs(self, size = 1):
"""
Draw samples from mixture
Arguments:
int size: number of samples to draw
Returns:
np.ndarray: samples
"""
if self.n_cl == 0:
raise FIGAROException("You are trying to draw samples from an empty mixture.\n If you are using the density_from_samples() method, you may want to draw samples from the output of that method.")
return self._rvs_probit(size)
def _rvs_probit(self, size = 1):
"""
Draw samples from mixture in probit space
Arguments:
int size: number of samples to draw
Returns:
np.ndarray: samples in probit space
"""
idx = np.random.choice(np.arange(self.n_cl), p = self.w, size = size)
ctr = Counter(idx)
if self.dim > 1:
samples = np.empty(shape = (1,self.dim))
for i, n in zip(ctr.keys(), ctr.values()):
samples = np.concatenate((samples, np.atleast_2d(mn(self.means[i], self.covs[i], allow_singular = True).rvs(size = n))))
else:
samples = np.array([np.zeros(1)])
for i, n in zip(ctr.keys(), ctr.values()):
samples = np.concatenate((samples, np.atleast_2d(mn(self.means[i], self.covs[i], allow_singular = True).rvs(size = n)).T))
return np.array(samples[1:])
[docs]
def gradient(self, x):
"""
Gradient of the mixture.
Arguments:
np.ndarray x: point to evaluate the gradient at
Returns:
np.ndarray: gradient
"""
if self.n_cl == 0:
raise FIGAROException("You are trying to evaluate an empty mixture.\n If you are using the density_from_samples() method, you may want to evaluate the output of that method.")
if len(np.shape(x)) < 2:
if self.dim == 1:
x = np.atleast_2d(x).T
else:
x = np.atleast_2d(x)
with np.errstate(divide = 'ignore', invalid = 'ignore'):
g = np.nan_to_num([self._gradient(xi) for xi in x], nan = 0.)
return g
[docs]
def log_gradient(self, x):
"""
Logarithmic gradient of the mixture.
Arguments:
np.ndarray x: point to evaluate the gradient at
Returns:
np.ndarray: logarithmic gradient
"""
if self.n_cl == 0:
raise FIGAROException("You are trying to evaluate an empty mixture.\n If you are using the density_from_samples() method, you may want to evaluate the output of that method.")
if len(np.shape(x)) < 2:
if self.dim == 1:
x = np.atleast_2d(x).T
else:
x = np.atleast_2d(x)
with np.errstate(divide = 'ignore', invalid = 'ignore'):
g = np.nan_to_num([self._log_gradient(xi) for xi in x], nan = 0.)
return g
def _gradient(self, x):
"""
Gradient of the mixture.
Arguments:
np.ndarray x: point to evaluate the gradient at
Returns:
np.ndarray: gradient
"""
return self._pdf(x)*self._log_gradient(x)
@probit
def _log_gradient(self, x):
"""
Logarithmic gradient of the mixture.
Arguments:
np.ndarray x: point to evaluate the gradient at
Returns:
np.ndarray: logarithmic gradient
"""
p = self._pdf_array_probit(x)
J = np.exp(-probit_log_jacobian(x, self.bounds, self.probit))
B = np.array([-dot_jit(inv_jit(sigma),(x - mu))*J for mu, sigma in zip(self.means, self.covs)])
try:
return np.average(B, weights = p, axis = 0) + gradient_inv_jacobian(x, self.bounds, self.probit)*J
except ZeroDivisionError:
return np.zeros(x.shape[-1])
[docs]
class mixture(density):
"""
Class to store a single draw from DPGMM/(H)DPGMM.
Methods inherited from density class
Arguments:
iterable means: component means
iterable covs: component covariances
np.ndarray w: component weights
np.ndarray bounds: bounds of probit transformation
int dim: number of dimensions
int n_cl: number of clusters in the mixture
int n_pts: number of points used to infer the mixture
double alpha: concentration parameter
bool probit: whether to use the probit transformation or not
np.ndarray log_w: component log weights
bool make_comp: make component objects
double alpha_factor: evaluated \\int pdet(theta)p(theta|lambda) conditioned on the mixture parameters
Returns:
mixture: instance of mixture class
"""
def __init__(self, means, covs, w, bounds, dim, n_cl, n_pts, alpha = 1., probit = True, log_w = None, make_comp = True, alpha_factor = 1., **kwargs):
self.means = means
self.covs = covs
self.inv_covs = np.array([np.linalg.inv(c) for c in covs])
self.det_covs = np.array([np.linalg.det(c) for c in covs])
if make_comp:
self.components = [mn(mean, cov, allow_singular = True) for mean, cov in zip(self.means, self.covs)]
else:
self.components = None
if log_w is None:
self.w = w
self.log_w = np.log(w)
else:
self.log_w = log_w
self.w = np.exp(log_w)
self.bounds = bounds
self.dim = int(dim)
self.n_cl = int(n_cl)
self.n_pts = int(n_pts)
self.probit = probit
self.alpha = alpha
self.alpha_factor = alpha_factor
[docs]
def marginalise(self, axis = -1):
"""
Marginalise out one or more dimensions from the mixture.
Arguments:
int or list of int axis: axis to marginalise on. Default: last
Returns:
figaro.mixture.mixture: marginalised mixture
"""
return _marginalise(self, axis)
[docs]
def condition(self, vals, dims, norm = True, filter = True, tol = 1e-3):
"""
Mixture conditioned on specific values of a subset of parameters.
Arguments:
iterable vals: value(s) to condition on
int or list of int dims: dimension(s) associated with given vals (starting from 0)
bool norm: whether to normalize the distribution or not
bool filter: filter the components with weight < tol
double tol: tolerance on the sum of the weights
Returns:
figaro.mixture.mixture: conditioned mixture
"""
v = np.mean(self.bounds, axis = -1)
v[dims] = vals
return _condition(self, v, dims, norm, filter, tol = 1e-3)
def _logpdf_probit(self, x):
"""
Evaluate log mixture at point(s) x in probit space
Arguments:
np.ndarray x: point(s) to evaluate the mixture at (in probit space)
Returns:
np.ndarray: mixture.logpdf(x)
"""
if self.components is not None:
return logsumexp(np.array([w+comp.logpdf(x) for w, comp in zip(self.log_w, self.components)]), axis = 0)
else:
return super()._logpdf_probit(x)
def _pdf_probit(self, x):
"""
Evaluate mixture at point(s) x in probit space
Arguments:
np.ndarray x: point(s) to evaluate the mixture at (in probit space)
Returns:
np.ndarray: mixture.pdf(x)
"""
if self.components is not None:
return np.sum(np.array([w*comp.pdf(x) for w, comp in zip(self.w, self.components)]), axis = 0)
else:
return super()._pdf_probit(x)
def _pdf_array_probit(self, x):
"""
Evaluate every mixture component at point(s) x.
Arguments:
np.ndarray x: point(s) to evaluate the components at (in probit space)
Returns:
np.ndarray: component.pdf(x) for each mixture component
"""
if self.components is not None:
return np.array([w*comp.pdf(x) for w, comp in zip(self.w, self.components)])
else:
return super()._pdf_array_probit(x)
#-------------------#
# Inference classes #
#-------------------#
[docs]
class DPGMM(density):
"""
Class to infer a distribution given a set of samples.
Arguments:
iterable bounds: boundaries of the rectangle over which the distribution is defined. It should be in the format [[xmin, xmax],[ymin, ymax],...]
iterable prior_pars: NIW prior parameters (k, L, nu, mu)
double alpha0: initial guess for concentration parameter
bool probit: whether to use the probit transformation or not
int n_reassignments: number of reassignments. Default is not to reassign.
Returns:
DPGMM: instance of DPGMM class
"""
def __init__(self, bounds,
prior_pars = None,
alpha0 = 1.,
probit = True,
n_reassignments = 0.,
):
self.probit = probit
self.bounds = np.atleast_2d(bounds)
self.dim = len(self.bounds)
if prior_pars is not None:
self.prior = _prior(*prior_pars)
else:
self.prior = _prior(*get_priors(bounds = self.bounds, probit = self.probit))
self.alpha = alpha0
self.alpha_0 = alpha0
self.mixture = []
self.w = []
self.log_w = []
self.N_list = []
self.n_cl = 0
self.n_pts = 0
self.stored_pts = {}
self.assignations = {}
self.n_reassignments = int(n_reassignments)
def __call__(self, x):
return self.pdf(x)
[docs]
def initialise(self, prior_pars = None):
"""
Initialise the mixture to initial conditions.
Arguments:
iterable prior_pars: NIW prior parameters (k, L, nu, mu). If None, old parameters are kept
"""
self.alpha = self.alpha_0
self.mixture = []
self.w = []
self.log_w = []
self.N_list = []
self.n_cl = 0
self.n_pts = 0
self.stored_pts = {}
self.assignations = {}
if prior_pars is not None:
self.prior = _prior(*prior_pars)
def _add_datapoint_to_component(self, x, ss):
"""
Update component parameters after assigning a sample to a component
Arguments:
np.ndarray x: sample
component ss: component to update
Returns:
component: updated component
"""
new_mean, new_S, new_N, new_mu, new_sigma = _compute_component_suffstats_add(x, ss.mean, ss.S, ss.N, self.prior.mu, self.prior.k, self.prior.nu, self.prior.L)
ss.mean = new_mean
ss.S = new_S
ss.N = new_N
ss.mu = new_mu
ss.sigma = new_sigma
return ss
def _remove_datapoint_from_component(self, x, ss):
"""
Update component parameters after removing a sample from the component
Arguments:
np.ndarray x: sample
component ss: component to update
Returns:
component: updated component
"""
new_mean, new_S, new_N, new_mu, new_sigma = _compute_component_suffstats_remove(x, ss.mean, ss.S, ss.N, self.prior.mu, self.prior.k, self.prior.nu, self.prior.L)
ss.mean = new_mean
ss.S = new_S
ss.N = new_N
ss.mu = new_mu
ss.sigma = new_sigma
return ss
def _log_predictive_likelihood(self, x, ss):
"""
Compute log likelihood of drawing sample x from component ss given the samples that are already assigned to that component.
Arguments:
np.ndarray x: sample
component ss: component to update
Returns:
double: log Likelihood
"""
if ss is None:
ss = _component(self.prior.mu, prior = self.prior, N = 0)
t_df, t_shape, mu_n = _compute_t_pars(self.prior.k, self.prior.mu, self.prior.nu, self.prior.L, ss.mean, ss.S, ss.N, self.dim)
try:
return _student_t(df = t_df, t = x, mu = mu_n, sigma = t_shape, dim = self.dim)
except np.linalg.LinAlgError:
return -np.inf
def _cluster_assignment_distribution(self, x):
"""
Compute the marginal distribution of cluster assignment for each cluster.
Arguments:
np.ndarray x: sample
Returns:
dict: p_i for each component
"""
scores = np.zeros(self.n_cl+1)
for i in range(self.n_cl+1):
if i == 0:
ss = None
scores[i] = np.log(self.alpha)
else:
ss = self.mixture[i-1]
with np.errstate(divide = 'ignore', invalid = 'ignore'):
scores[i] = np.log(ss.N)
if np.isfinite(scores[i]):
scores[i] += self._log_predictive_likelihood(x, ss)
else:
scores[i] = -np.inf
norm = logsumexp_jit(scores)
return np.exp(scores - norm)
def _assign_to_cluster(self, x, pt_id = None):
"""
Assign the new sample x to an existing cluster or to a new cluster according to the marginal distribution of cluster assignment.
Arguments:
np.ndarray x: sample
int pt_id: point id
"""
if pt_id is None:
pt_id = len(list(self.stored_pts.keys()))-1
self.n_pts += 1
scores = self._cluster_assignment_distribution(x)
cid = np.random.choice(np.arange(-1,self.n_cl), p=scores)
if cid == -1:
self.mixture.append(_component(x, prior = self.prior))
self.N_list.append(1.)
self.n_cl += 1
self.assignations[pt_id] = int(self.n_cl)-1
else:
self.mixture[int(cid)] = self._add_datapoint_to_component(x, self.mixture[int(cid)])
self.N_list[int(cid)] += 1
self.assignations[pt_id] = int(cid)
# Update weights
self.w = np.array(self.N_list)
# Beware of empty clusters!
with np.errstate(divide = 'ignore', invalid = 'ignore'):
self.w = self.w/self.w.sum()
self.log_w = np.log(self.w)
return
def _remove_from_cluster(self, x, cid):
"""
Remove the sample x from its assigned cluster.
Arguments:
np.ndarray x: sample
int cid: cluster id
"""
self.mixture[int(cid)] = self._remove_datapoint_from_component(x, self.mixture[int(cid)])
self.N_list[int(cid)] -= 1
self.n_pts -= 1
# Update weights
self.w = np.array(self.N_list)
# Beware of empty clusters!
with np.errstate(divide = 'ignore', invalid = 'ignore'):
self.w = self.w/self.w.sum()
self.log_w = np.log(self.w)
return
[docs]
def density_from_samples(self, samples, make_comp = True):
"""
Reconstruct the probability density from a set of samples.
Arguments:
iterable samples: samples set
bool make_comp: whether to instantiate the scipy.stats.multivariate_normal components or not
Returns:
mixture: the inferred mixture
"""
np.random.shuffle(samples)
samples = np.ascontiguousarray(samples)
# Thermalise
for s in samples:
self.add_new_point(s)
# Random Gibbs walk (if required)
for id in np.random.choice(self.n_pts, size = self.n_reassignments, replace = True):
self._reassign_point(id)
d = self.build_mixture(make_comp)
self.initialise()
return d
@probit
def add_new_point(self, x):
"""
Update the probability density reconstruction adding a new sample
Arguments:
np.ndarray x: sample
"""
self.stored_pts[len(list(self.stored_pts.keys()))] = np.atleast_2d(x)
self._assign_to_cluster(np.atleast_2d(x))
self.alpha = _update_alpha(self.alpha, self.n_pts, self.n_cl, self.alpha_0)
def _reassign_point(self, id):
"""
Update the probability density reconstruction reassigining an existing sample
Arguments:
id: sample id
"""
x = self.stored_pts[id]
cid = self.assignations[id]
self._remove_from_cluster(x, cid)
self._assign_to_cluster(x, id)
self.alpha = _update_alpha(self.alpha, self.n_pts, (np.array(self.N_list) > 0).sum(), self.alpha_0)
[docs]
def build_mixture(self, make_comp = True):
"""
Instances a mixture class representing the inferred distribution
Arguments:
bool make_comp: whether to instantiate the scipy.stats.multivariate_normal components or not
Returns:
mixture: the inferred distribution
"""
if self.n_cl == 0:
raise FIGAROException("You are trying to build an empty mixture - perhaps you called the initialise() method. If you are using the density_from_samples() method, the inferred mixture is returned by that method as an instance of mixture class.")
# Number of active clusters
n_cl = (np.array(self.N_list) > 0).sum()
means = np.zeros((n_cl, self.dim))
variances = np.zeros((n_cl, self.dim, self.dim))
i = 0
for ss in self.mixture:
if ss.N > 0:
k_n, mu_n, nu_n, L_n = _compute_hyperpars(self.prior.k, self.prior.mu, self.prior.nu, self.prior.L, ss.mean, ss.S, ss.N)
variances[i] = invwishart(df = nu_n, scale = L_n).rvs()
means[i] = mn(mean = mu_n[0], cov = rescale_matrix(variances[i], k_n), allow_singular = True).rvs()
i += 1
w = dirichlet(self.w[self.w > 0]*self.n_pts+self.alpha/self.n_cl).rvs()[0]
return mixture(means, variances, w, self.bounds, self.dim, n_cl, self.n_pts, self.alpha, probit = self.probit, make_comp = make_comp)
# Methods to overwrite density methods
def _rvs_probit(self, size = 1):
"""
Draw samples from mixture in probit space
Arguments:
int size: number of samples to draw
Returns:
np.ndarray: samples in probit space
"""
idx = np.random.choice(np.arange(len(self.w)), p = self.w, size = size)
ctr = Counter(idx)
if self.dim > 1:
samples = np.empty(shape = (1,self.dim))
for i, n in zip(ctr.keys(), ctr.values()):
samples = np.concatenate((samples, np.atleast_2d(mn(self.mixture[i].mu, self.mixture[i].sigma, allow_singular = True).rvs(size = n))))
else:
samples = np.array([np.zeros(1)])
for i, n in zip(ctr.keys(), ctr.values()):
samples = np.concatenate((samples, np.atleast_2d(mn(self.mixture[i].mu[0], self.mixture[i].sigma, allow_singular = True).rvs(size = n)).T))
return samples[1:]
def _pdf_probit(self, x):
"""
Evaluate mixture at point(s) x in probit space
Arguments:
np.ndarray x: point(s) to evaluate the mixture at (in probit space)
Returns:
np.ndarray: mixture.pdf(x)
"""
return np.sum(np.array([w*mn(comp.mu, comp.sigma, allow_singular = True).pdf(x) for comp, w in zip(self.mixture, self.w)]), axis = 0)
def _logpdf_probit(self, x):
"""
Evaluate log mixture at point(s) x in probit space
Arguments:
np.ndarray x: point(s) to evaluate the mixture at (in probit space)
Returns:
np.ndarray: mixture.logpdf(x)
"""
return logsumexp(np.array([w + mn(comp.mu, comp.sigma, allow_singular = True).logpdf(x) for comp, w in zip(self.mixture, self.log_w)]), axis = 0)
def _fast_pdf_probit(self, x):
"""
Evaluate mixture at point x in probit space
Arguments:
np.ndarray x: point to evaluate the mixture at (in probit space)
Returns:
np.ndarray: mixture.pdf(x)
"""
return np.sum(self.w*_mvn_pdf(x, self.means, self.covs, self.inv_covs, self.det_covs))
def _fast_logpdf_probit(self, x):
"""
Evaluate log mixture at point x in probit space
Arguments:
np.ndarray x: point to evaluate the mixture at (in probit space)
Returns:
np.ndarray: mixture.logpdf(x)
"""
return logsumexp(self.log_w+_mvn_logpdf(x, self.means, self.covs, self.inv_covs, self.det_covs))
[docs]
class HDPGMM(DPGMM):
"""
Class to infer a distribution given a set of observations (each being a set of samples).
Child of DPGMM class
Arguments:
iterable bounds: boundaries of the rectangle over which the distribution is defined. It should be in the format [[xmin, xmax],[ymin, ymax],...]
iterable prior_pars: IW parameters
double alpha0: initial guess for concentration parameter
double MC_draws: number of MC draws for integral
bool probit: whether to use the probit transformation or not
int n_reassignments: number of reassignments. Default is not to reassign.
callable selection_function: selection function approximant or samples
np.ndarray injection_pdf: pdf of injected samples (for selection function inclusion)
Returns:
HDPGMM: instance of HDPGMM class
"""
def __init__(self, bounds,
alpha0 = 1.,
prior_pars = None,
MC_draws = None,
probit = True,
n_reassignments = 0.,
selection_function = None,
injection_pdf = None,
total_injections = None,
weights_inj = None,
lower_limit_alpha = 1e-3,
):
bounds = np.atleast_2d(bounds)
self.dim = len(bounds)
super().__init__(bounds = bounds, alpha0 = alpha0, probit = probit, n_reassignments = n_reassignments)
if prior_pars is not None:
self.exp_sigma, self.a = prior_pars
else:
self.exp_sigma, self.a = get_priors(bounds = self.bounds, probit = self.probit, hierarchical = True)
self.invgamma = invgamma(self.a)
if MC_draws is None:
self.MC_draws = int((self.dim+1)*1e3)
else:
self.MC_draws = int(MC_draws)
self.evaluated_logL = {}
# Selection function
self.selfunc = selection_function
self.lower_limit_alpha = lower_limit_alpha
if not callable(self.selfunc) and self.selfunc is not None:
try:
self.log_inj_pdf = np.log(injection_pdf)
self.np_log_inj_pdf = np.copy(np.log(injection_pdf))
self.total_inj = int(total_injections)
if weights_inj is not None:
self.weights_inj = weights_inj
else:
self.weights_inj = np.ones(len(self.log_inj_pdf))
except TypeError:
raise FIGAROException("Please provide injection pdf")
self.selfunc_probit = self.selfunc
self.log_jacobian_inj = np.zeros(len(self.selfunc))
if self.probit:
self.selfunc_probit = np.atleast_2d(transform_to_probit(self.selfunc, self.bounds))
if self.selfunc_probit.shape[0] == 1:
self.selfunc_probit = self.selfunc_probit.T
self.log_jacobian_inj = -probit_logJ(self.selfunc_probit, self.bounds, self.probit)
# MC samples
self._draw_MC_samples()
[docs]
def initialise(self, prior_pars = None, update_injections = False):
"""
Initialise the mixture to initial conditions
"""
super().initialise()
if prior_pars is not None:
self.exp_sigma, self.a = prior_pars
self.evaluated_logL = {}
if self.probit and update_injections:
self.selfunc_probit = np.atleast_2d(transform_to_probit(self.selfunc, self.bounds))
if self.selfunc_probit.shape[0] == 1:
self.selfunc_probit = self.selfunc_probit.T
self.log_jacobian_inj = -probit_logJ(self.selfunc_probit, self.bounds, self.probit)
self._draw_MC_samples()
def _draw_MC_samples(self):
"""
Draws MC samples for mu and sigma
"""
self.sigma_MC = np.array([self.invgamma.rvs(self.MC_draws)*(self.exp_sigma[i]**2*(self.a+1)) for i in range(self.dim)]).T
self.mu_MC = np.random.uniform(low = self.bounds[:,0], high = self.bounds[:,1], size = (self.MC_draws, self.dim))
if self.probit:
self.mu_MC = transform_to_probit(self.mu_MC, self.bounds)
if self.dim == 1:
self.sigma_MC = self.sigma_MC.flatten()
self.mu_MC = self.mu_MC.flatten()
else:
rhos = invwishart(df = self.dim+2, scale = np.identity(self.dim)).rvs(size = self.MC_draws)
rhos = np.array([r/outer_jit(np.sqrt(diag_jit(r)), np.sqrt(diag_jit(r))) for r in rhos])
self.sigma_MC = np.array([r*outer_jit(s,s) for r, s in zip(rhos, np.sqrt(self.sigma_MC))])
if self.selfunc is not None:
# Approximant
if callable(self.selfunc):
with np.errstate(divide = 'ignore', invalid = 'ignore'):
if self.probit:
self.log_alpha_factor = np.array([np.log(np.mean(self.selfunc(transform_from_probit(mn(m,s, allow_singular = True).rvs(self.MC_draws*1), self.bounds)))) for m, s in zip(self.mu_MC, self.sigma_MC)])
else:
self.log_alpha_factor = np.array([np.log(np.mean(self.selfunc(mn(m,s, allow_singular = True).rvs(self.MC_draws*1)))) for m, s in zip(self.mu_MC, self.sigma_MC)])
# Injections
else:
self.log_alpha_factor = np.array([logsumexp(mn(m,s, allow_singular = True).logpdf(self.selfunc_probit) + self.log_jacobian_inj - self.log_inj_pdf, b = self.weights_inj) - np.log(self.total_inj) for m, s in zip(self.mu_MC, self.sigma_MC)])
# TODO: make more efficient
# std = []
# for m, s, a in zip(self.mu_MC, self.sigma_MC, self.log_alpha_factor):
# x = self.weights_inj*(mn(m,s, allow_singular = True).logpdf(self.selfunc_probit) + self.log_jacobian_inj - self.log_inj_pdf)
## std.append(np.sqrt(np.sum((np.exp(x)-np.exp(a))**2)/self.total_inj**2))
# std.append(np.sqrt(np.sum(np.exp(2*x)/self.total_inj**2) - np.exp(2*a)/self.total_inj))
# std = np.array(std)
# self.log_alpha_factor[100*(np.log(std) - self.log_alpha_factor) > np.log(70)] = np.inf
self.log_alpha_factor = np.nan_to_num(self.log_alpha_factor, nan = np.inf, posinf = np.inf, neginf = np.inf)
# Censored regions
self.log_alpha_factor[self.log_alpha_factor < np.log(self.lower_limit_alpha)] = np.inf
self.log_alpha_factor[(self.log_alpha_factor > 0.) & (np.isfinite(self.log_alpha_factor))] = 0.
# import matplotlib.pyplot as plt
# c = plt.scatter(self.mu_MC[:,0], self.mu_MC[:,1], c = 100*std/np.exp(self.log_alpha_factor))
# plt.colorbar(c)
# plt.show()
else:
self.log_alpha_factor = np.zeros(self.MC_draws)
[docs]
def add_new_point(self, ev):
"""
Update the probability density reconstruction adding a new sample
Arguments:
iterable ev: set of single-event draws from a DPGMM inference
"""
x = np.random.choice(ev)
self.stored_pts[len(list(self.stored_pts.keys()))] = x
self._assign_to_cluster(x)
self.alpha = _update_alpha(self.alpha, self.n_pts, self.n_cl, self.alpha_0)
def _cluster_assignment_distribution(self, x, logL_x = None):
"""
Compute the marginal distribution of cluster assignment for each cluster.
Arguments:
figaro.mixture.mixture x: sample
double logL_x: evaluated log likelihood
Returns:
dict: p_i for each component
"""
scores = np.zeros(self.n_cl+1)
logL_N = np.zeros((self.n_cl+1, self.MC_draws))
if logL_x is None:
if self.dim == 1:
logL_x = evaluate_mixture_MC_draws_1d(self.mu_MC, self.sigma_MC, x.means, x.covs, x.w) - self.log_alpha_factor
else:
logL_x = evaluate_mixture_MC_draws(self.mu_MC, self.sigma_MC, x.means, x.covs, x.w) - self.log_alpha_factor
for i in range(self.n_cl+1):
if i == 0:
ss = None
logL_D = np.zeros(self.MC_draws)
scores[i] = np.log(self.alpha)
else:
ss = self.mixture[i-1]
logL_D = ss.logL_D
with np.errstate(divide = 'ignore', invalid = 'ignore'):
scores[i] = np.log(ss.N)
if np.isfinite(scores[i]):
scores[i] += logsumexp_jit(logL_D + logL_x) - logsumexp_jit(logL_D)
else:
scores[i] = -np.inf
logL_N[i] = logL_D + logL_x
norm = logsumexp_jit(scores)
scores = np.exp(scores-norm)
return scores, logL_N, logL_x
def _assign_to_cluster(self, x, pt_id = None, logL_x = None):
"""
Assign the new sample x to an existing cluster or to a new cluster according to the marginal distribution of cluster assignment.
Arguments:
np.ndarray x: sample
int pt_id: point id
logL_x: evaluated log likelihood
"""
if pt_id is None:
pt_id = len(list(self.stored_pts.keys()))-1
self.n_pts += 1
scores, logL_N, logL_x = self._cluster_assignment_distribution(x, logL_x)
try:
cid = np.random.choice(np.arange(-1, self.n_cl), p=scores)
except ValueError:
cid = -1
if cid == -1:
self.mixture.append(_component_h(x, self.dim, self.prior, logL_N[int(cid)+1], self.mu_MC, self.sigma_MC, self.log_alpha_factor))
self.N_list.append(1.)
self.n_cl += 1
self.assignations[pt_id] = int(self.n_cl)-1
else:
self.mixture[int(cid)] = self._add_datapoint_to_component(self.mixture[int(cid)], logL_N[int(cid)+1])
self.N_list[int(cid)] += 1
self.assignations[pt_id] = int(cid)
self.evaluated_logL[pt_id] = logL_x
# Update weights
self.w = np.array(self.N_list)
with np.errstate(divide = 'ignore', invalid = 'ignore'):
self.w = self.w/self.w.sum()
self.log_w = np.log(self.w)
return
def _remove_from_cluster(self, x, cid, logL_x):
"""
Remove the sample x from its assigned cluster.
Arguments:
np.ndarray x: sample
int cid: cluster id
double logL_x: log likelihood for the point
"""
with np.errstate(invalid = 'ignore'):
logL = np.nan_to_num(self.mixture[int(cid)].logL_D - logL_x, nan = -np.inf, posinf = -np.inf, neginf = -np.inf)
self.mixture[int(cid)] = self._remove_datapoint_from_component(self.mixture[int(cid)], logL)
self.N_list[int(cid)] -= 1
self.n_pts -= 1
# Update weights
self.w = np.array(self.N_list)
with np.errstate(divide = 'ignore', invalid = 'ignore'):
self.w = self.w/self.w.sum()
self.log_w = np.log(self.w)
return
def _add_datapoint_to_component(self, ss, logL_D):
"""
Update component parameters after assigning a sample to a component
Arguments:
component ss: component to update
double logL_D: log Likelihood denominator
Returns:
component: updated component
"""
ss.logL_D = logL_D
log_norm_D = logsumexp_jit(logL_D)
idx = np.random.choice(self.MC_draws, p = np.exp(logL_D - log_norm_D))
ss.mu = np.copy(self.mu_MC[idx])
ss.sigma = np.copy(self.sigma_MC[idx])
if self.dim == 1:
ss.mu = np.atleast_2d(ss.mu).T
ss.sigma = np.atleast_2d(ss.sigma).T
ss.N += 1
ss.log_N_true = np.log(ss.N) - self.log_alpha_factor[idx]
return ss
def _remove_datapoint_from_component(self, ss, logL_D):
"""
Update component parameters after assigning a sample to a component
Arguments:
component ss: component to update
double logL_D: log Likelihood denominator
Returns:
component: updated component
"""
ss.logL_D = logL_D
log_norm_D = logsumexp_jit(logL_D)
idx = np.random.choice(self.MC_draws, p = np.exp(logL_D - log_norm_D))
ss.mu = np.copy(self.mu_MC[idx])
ss.sigma = np.copy(self.sigma_MC[idx])
if self.dim == 1:
ss.mu = np.atleast_2d(ss.mu).T
ss.sigma = np.atleast_2d(ss.sigma).T
ss.N -= 1
ss.log_N_true = np.log(ss.N) - self.log_alpha_factor[idx]
return ss
[docs]
def build_mixture(self, make_comp = True):
"""
Instances a mixture class representing the inferred distribution
Arguments:
bool make_comp: whether to instantiate the scipy.stats.multivariate_normal components or not
Returns:
mixture: the inferred distribution
"""
if self.n_cl == 0:
raise FIGAROException("You are trying to build an empty mixture - perhaps you called the initialise() method. If you are using the density_from_samples() method, the inferred mixture is returned by that method as an instance of mixture class.")
idx = np.where(np.array(self.N_list) > 0)[0]
N_pts = np.array([comp.log_N_true for comp in np.array(self.mixture)[idx]])
self.n_cl = (np.array(self.N_list) > 0).sum()
if self.selfunc is not None:
log_w, w = self.log_w, self.w
self.log_w = N_pts - logsumexp_jit(N_pts)
self.w = np.exp(self.log_w)
aa = self.compute_alpha_factor()#self._compute_individual_alpha_factors(idx)
N_pts = (self.w/aa )*self.n_pts
new_w = dirichlet(N_pts+self.alpha/self.n_cl).rvs()[0]
self.w = new_w
self.log_w = np.log(new_w)
alpha_factor = self.compute_alpha_factor()
self.log_w, self.w = log_w, w
else:
alpha_factor = 1.
N_pts = np.exp(N_pts)
new_w = dirichlet(N_pts+self.alpha/self.n_cl).rvs()[0]
return mixture(np.array([comp.mu.flatten() for comp in np.array(self.mixture)[idx]]), np.array([comp.sigma for comp in np.array(self.mixture)[idx]]), new_w, self.bounds, self.dim, self.n_cl, self.n_pts, self.alpha, probit = self.probit, make_comp = make_comp, alpha_factor = alpha_factor)
def _compute_individual_alpha_factors(self, idx):
"""
Compute the integral \\int pdet(theta)p(theta|lambda) dtheta conditioned on the specific DPGMM parameters
Argiments:
list idx: non-empty components
Returns:
double: value of the integral
"""
# Approximant
if callable(self.selfunc):
if probit:
alpha_factors = np.array([np.mean(self.selfunc(transform_from_probit(mn(comp.mu, comp.sigma, allow_singular = True).rvs(self.MC_draws*1), self.bounds))) for comp in np.array(self.mixture)[idx]])
else:
alpha_factors = np.array([np.mean(self.selfunc(mn(comp.mu, comp.sigma, allow_singular = True).rvs(self.MC_draws*1))) for comp in np.array(self.mixture)[idx]])
# Injections
else:
alpha_factors = np.array([np.exp(logsumexp(mn(comp.mu, comp.sigma, allow_singular = True).logpdf(self.selfunc_probit) + self.log_jacobian_inj - self.log_inj_pdf, b = self.weights_inj) - np.log(self.total_inj)) for comp in np.array(self.mixture)[idx]])
return alpha_factors
[docs]
def compute_alpha_factor(self):
"""
Compute the integral \\int pdet(theta)p(theta|lambda) dtheta conditioned on the specific DPGMM parameters
Returns:
double: value of the integral
"""
# Approximant
if callable(self.selfunc):
alpha_factor = np.mean(self.selfunc(self.rvs(self.MC_draws)))
# Injections
else:
alpha_factor = np.exp(logsumexp(self.logpdf(self.selfunc) + self.log_jacobian_inj - self.log_inj_pdf, b = self.weights_inj) - np.log(self.total_inj))
return alpha_factor
[docs]
def density_from_samples(self, events, make_comp = True, initialise = True):
"""
Reconstruct the probability density from a set of samples.
Arguments:
iterable samples: set of single-event draws from DPGMM
bool make_comp: whether to instantiate the scipy.stats.multivariate_normal components or not
Returns:
mixture: the inferred mixture
"""
np.random.shuffle(events)
for ev in events:
self.add_new_point(ev)
# Random Gibbs walk (if required)
for id in np.random.choice(self.n_pts, size = self.n_reassignments, replace = True):
self._reassign_point(id)
d = self.build_mixture(make_comp = make_comp)
if initialise:
self.initialise()
return d
def _reassign_point(self, id):
"""
Update the probability density reconstruction reassigining an existing sample
Arguments:
id: sample id
"""
x = self.stored_pts[id]
cid = self.assignations[id]
logL_x = self.evaluated_logL[id]
self._remove_from_cluster(x, cid, logL_x)
self._assign_to_cluster(x, id, logL_x)
self.alpha = _update_alpha(self.alpha, self.n_pts, (np.array(self.N_list) > 0).sum(), self.alpha_0)