Coverage for src / galsbi / ucat / galaxy_sampling_util.py: 91%
300 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-17 09:47 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-17 09:47 +0000
1# Copyright (C) 2018 ETH Zurich, Institute for Particle Physics and Astrophysics
3"""
4Created on Mar 6, 2018
5author: Joerg Herbel
6"""
8import math
9import warnings
11import numpy as np
12import scipy.interpolate
13import scipy.optimize
14import scipy.special
16warnings.filterwarnings("once")
17EPS = 10e-15
20class UCatNumGalError(ValueError):
21 """
22 Raised when more galaxies than allowed by the input parameters are sampled
23 """
26class UCatMZInterpError(ValueError):
27 """
28 Raised when lum_fct_m_max is too low for the given redshift range
29 """
32class Catalog:
33 pass
36def apply_pycosmo_distfun(dist_fun, z):
37 z_unique, inv_ind = np.unique(z, return_inverse=True)
38 dist = dist_fun(a=1 / (1 + z_unique))[inv_ind] # Unit: Mpc
39 return dist
42def intp_z_m_cut(cosmo, mag_calc, par):
43 """
44 This function returns an interpolator which predicts the maximum absolute magnitude
45 for a given redshift such that a galaxy will still fall below the limiting apparent
46 magnitude in the reference band (par.gals_mag_max). To do this, we do a check for a
47 number of grid points in redshift. The check consists of finding the template with
48 the smallest ratio of flux in the ref-band at that redshift (apparent flux) and flux
49 in the luminosity function band at redshift zero (absolute flux). We then compute
50 the absolute magnitude a galaxy would need to have to still pass the cut assuming
51 that its spectrum is given by only that one template which we found. This works
52 because the mixing-in of other templates will only make the object brighter in the
53 r-band at this redshift. See also docs/jupyter_notebooks/z_m_cut.ipynb.
54 """
56 def find_max_template_ind(z, n_templates):
57 coeffs = np.eye(n_templates)
59 m_lf = mag_calc(
60 redshifts=np.zeros(n_templates),
61 excess_b_v=np.zeros(n_templates),
62 coeffs=coeffs,
63 filter_names=[par.lum_fct_filter_band],
64 )[par.lum_fct_filter_band]
66 m_ref = mag_calc(
67 redshifts=np.full(n_templates, z),
68 excess_b_v=np.zeros(n_templates),
69 coeffs=coeffs,
70 filter_names=[par.reference_band],
71 )[par.reference_band]
73 ind = np.argmin(m_ref - m_lf)
75 return ind
77 def app_mag_ref(z, temp_i, m_abs, n_templates):
78 coeff = np.zeros((1, n_templates))
79 coeff[0, temp_i] = 1
80 coeff[0, temp_i] = 10 ** (
81 0.4
82 * (
83 mag_calc(
84 redshifts=np.zeros(1),
85 excess_b_v=np.zeros(1),
86 coeffs=coeff,
87 filter_names=[par.lum_fct_filter_band],
88 )[par.lum_fct_filter_band][0]
89 - m_abs
90 )
91 )
92 coeff[0, temp_i] *= (1e-5 / cosmo.background.dist_lum_a(a=1 / (1 + z))) ** 2 / (
93 1 + z
94 )
95 m = mag_calc(
96 redshifts=np.atleast_1d(z),
97 excess_b_v=np.zeros(1),
98 coeffs=coeff,
99 filter_names=[par.reference_band],
100 )[par.reference_band]
102 return m[0]
104 n_templates = mag_calc.n_templates
105 z_intp = [par.lum_fct_z_res]
106 temp_ind = find_max_template_ind(z_intp[0], n_templates)
107 abs_mag = [
108 scipy.optimize.brentq(
109 f=lambda m: app_mag_ref(z_intp[0], temp_ind, m, n_templates)
110 - par.gals_mag_max,
111 a=-100,
112 b=par.gals_mag_max,
113 )
114 ]
116 i_loop = 0
117 while True:
118 i_loop += 1
119 try:
120 z_ = z_intp[-1] + 0.02
122 if z_ > par.lum_fct_z_max:
123 break
125 temp_ind = find_max_template_ind(z_, n_templates)
126 abs_mag_ = scipy.optimize.brentq(
127 f=lambda m,
128 temp_ind=temp_ind,
129 z_=z_,
130 n_templates=n_templates: app_mag_ref(z_, temp_ind, m, n_templates)
131 - par.gals_mag_max,
132 a=-100,
133 b=abs_mag[-1],
134 )
135 z_intp.append(z_)
136 abs_mag.append(abs_mag_)
137 except ValueError:
138 break
140 intp = scipy.interpolate.interp1d(
141 x=z_intp,
142 y=abs_mag,
143 kind="cubic",
144 bounds_error=False,
145 fill_value=(abs_mag[0], abs_mag[-1]),
146 )
148 if np.any(intp(intp.x) > par.lum_fct_m_max):
149 msg = (
150 "par.lum_fct_m_max is too low according to z-m-interpolation,"
151 " some galaxies may be missing\n"
152 f"gals_mag_max={par.gals_mag_max:2.2f}"
153 f" lum_fct_z_max={par.lum_fct_z_max:2.2f}"
154 f" m_max_sample={np.max(intp(intp.x)):2.2f}"
155 f" lum_fct_m_max={par.lum_fct_m_max:2.2f}"
156 )
157 if par.raise_z_m_interp_error:
158 raise UCatMZInterpError(msg)
159 else:
160 warnings.warn(msg, stacklevel=1)
162 return intp
165class TruncatedMVN:
166 """Create a normal distribution :math:`X \sim N ({\mu}, {\Sigma})` subject to
167 linear inequality constraints :math:`lb < X < ub` and sample from it using minimax
168 tilting. Based on the MATLAB implemention by the authors (reference below).
170 :param np.ndarray mu: (size D)
171 mean of the normal distribution :math:`\mathbf {\mu}`.
172 :param np.ndarray cov: (size D x D)
173 covariance of the normal distribution :math:`\mathbf {\Sigma}`.
174 :param np.ndarray lb: (size D)
175 lower bound constrain of the multivariate normal distribution :math:`\mathbf lb`
176 :param np.ndarray ub: (size D)
177 upper bound constrain of the multivariate normal distribution :math:`\mathbf ub`
178 :param Union[int, None] seed: a random seed.
180 Note that the algorithm may not work if 'cov' is close to being rank deficient.
182 Reference:
183 Botev, Z. I., (2016), The normal law under linear restrictions: simulation and
184 estimation via minimax tilting,
185 Journal of the Royal Statistical Society Series B, 79, issue 1, p. 125-148,
187 Reimplementation by Paul Brunzema
188 """
190 def __init__(self, mu, cov, lb, ub, seed=None):
191 self.dim = len(mu)
192 if not cov.shape[0] == cov.shape[1]:
193 raise RuntimeError("Covariance matrix must be of shape DxD!")
194 if not (
195 self.dim == cov.shape[0] and self.dim == len(lb) and self.dim == len(ub)
196 ):
197 raise RuntimeError(
198 "Dimensions D of mean (mu), covariance matric (cov), lower bound (lb) "
199 "and upper bound (ub) must be the same!"
200 )
202 self.cov = cov
203 self.orig_mu = mu
204 self.orig_lb = lb
205 self.orig_ub = ub
207 # permutated
208 self.lb = lb - mu # move distr./bounds to have zero mean
209 self.ub = ub - mu # move distr./bounds to have zero mean
210 if np.any(self.ub <= self.lb):
211 raise RuntimeError(
212 "Upper bound (ub) must be strictly greater than lower bound (lb) for "
213 "all D dimensions!"
214 )
216 # scaled Cholesky with zero diagonal, permutated
217 self.L = np.empty_like(cov)
218 self.unscaled_L = np.empty_like(cov)
220 # placeholder for optimization
221 self.perm = None
222 self.x = None
223 self.mu = None
224 self.psistar = None
226 # for numerics
227 self.eps = EPS
229 # a random state
230 self.random_state = np.random.RandomState(seed)
232 def sample(self, n):
233 """Create n samples from the truncated normal distribution.
235 :param int n: Number of samples to create.
236 :return: D x n array with the samples.
237 :rtype: np.ndarray
238 """
239 if not isinstance(n, int):
240 raise RuntimeError("Number of samples must be an integer!")
242 # factors (Cholesky, etc.) only need to be computed once!
243 if self.psistar is None:
244 self.compute_factors()
246 # start acceptance rejection sampling
247 rv = np.array([], dtype=np.float64).reshape(self.dim, 0)
248 accept, iteration = 0, 0
249 while accept < n:
250 logpr, Z = self.mvnrnd(n, self.mu) # simulate n proposals
251 idx = -np.log(self.random_state.rand(n)) > (
252 self.psistar - logpr
253 ) # acceptance tests
254 rv = np.concatenate((rv, Z[:, idx]), axis=1) # accumulate accepted
255 accept = rv.shape[1] # keep track of # of accepted
256 iteration += 1
257 if iteration == 10**3:
258 print("Warning: Acceptance prob. smaller than 0.001.")
259 elif iteration > 10**4:
260 accept = n
261 rv = np.concatenate((rv, Z), axis=1)
262 print("Warning: Sample is only approximately distributed.")
264 # finish sampling and postprocess the samples!
265 order = self.perm.argsort(axis=0)
266 rv = rv[:, :n]
267 rv = self.unscaled_L @ rv
268 rv = rv[order, :]
270 # retransfer to original mean
271 rv += np.tile(
272 self.orig_mu.reshape(self.dim, 1), (1, rv.shape[-1])
273 ) # Z = X + mu
274 return rv
276 def compute_factors(self):
277 # compute permutated Cholesky factor and solve optimization
279 # Cholesky decomposition of matrix with permuation
280 self.unscaled_L, self.perm = self.colperm()
281 D = np.diag(self.unscaled_L)
282 if np.any(self.eps > D):
283 print("Warning: Method might fail as covariance matrix is singular!")
285 # rescale
286 scaled_L = self.unscaled_L / np.tile(D.reshape(self.dim, 1), (1, self.dim))
287 self.lb = self.lb / D
288 self.ub = self.ub / D
290 # remove diagonal
291 self.L = scaled_L - np.eye(self.dim)
293 # get gradient/Jacobian function
294 gradpsi = self.get_gradient_function()
295 x0 = np.zeros(2 * (self.dim - 1))
297 # find optimal tilting parameter non-linear equation solver
298 sol = scipy.optimize.root(
299 gradpsi, x0, args=(self.L, self.lb, self.ub), method="hybr", jac=True
300 )
301 if not sol.success:
302 print("Warning: Method may fail as covariance matrix is close to singular!")
303 self.x = sol.x[: self.dim - 1]
304 self.mu = sol.x[self.dim - 1 :]
306 # compute psi star
307 self.psistar = self.psy(self.x, self.mu)
309 def reset(self):
310 # reset factors -> when sampling,
311 # optimization for optimal tilting parameters is performed again
313 # permutated
314 self.lb = self.orig_lb - self.orig_mu # move distr./bounds to have zero mean
315 self.ub = self.orig_ub - self.orig_mu
317 # scaled Cholesky with zero diagonal, permutated
318 self.L = np.empty_like(self.cov)
319 self.unscaled_L = np.empty_like(self.cov)
321 # placeholder for optimization
322 self.perm = None
323 self.x = None
324 self.mu = None
325 self.psistar = None
327 def mvnrnd(self, n, mu):
328 # generates the proposals from the exponentially tilted sequential importance
329 # sampling pdf
330 # output: logpr, log-likelihood of sample
331 # Z, random sample
332 mu = np.append(mu, [0.0])
333 Z = np.zeros((self.dim, n))
334 logpr = 0
335 for k in range(self.dim):
336 # compute matrix multiplication L @ Z
337 col = self.L[k, :k] @ Z[:k, :]
338 # compute limits of truncation
339 tl = self.lb[k] - mu[k] - col
340 tu = self.ub[k] - mu[k] - col
341 # simulate N(mu,1) conditional on [tl,tu]
342 Z[k, :] = mu[k] + self.trandn(tl, tu)
343 # update likelihood ratio
344 logpr += lnNormalProb(tl, tu) + 0.5 * mu[k] ** 2 - mu[k] * Z[k, :]
345 return logpr, Z
347 def trandn(self, lower_bounds, upper_bounds):
348 """Sample generator for the truncated standard multivariate normal
349 distribution :math:`X \sim N(0,I)` s.t. :math:`lb<X<ub`.
351 If you wish to simulate a random variable 'Z' from the non-standard
352 Gaussian :math:`N(m,s^2)` conditional on :math:`lb<Z<ub`, then first
353 simulate x=TruncatedMVNSampler.trandn((l-m)/s,(u-m)/s) and set Z=m+s*x.
354 Infinite values for 'ub' and 'lb' are accepted.
356 :param np.ndarray lower_bounds: (size D) lower bound constrain of the
357 normal distribution :math:`\mathbf lb`.
358 :param np.ndarray upper_bounds: (size D) upper bound constrain of the
359 normal distribution :math:`\mathbf lb`.
360 :return: D samples if the truncated normal distribition x ~ N(0, I)
361 subject to lb < x < ub.
362 :rtype: np.ndarray
363 """
364 if not len(lower_bounds) == len(upper_bounds):
365 raise RuntimeError(
366 "Lower bound (lb) and upper bound (ub) must be of the same " "length!"
367 )
369 x = np.empty_like(lower_bounds)
370 a = 0.66 # threshold used in MATLAB implementation
371 # three cases to consider
372 # case 1: a<lb<ub
373 case1_mask = lower_bounds > a
374 if np.any(case1_mask):
375 tl = lower_bounds[case1_mask]
376 tu = upper_bounds[case1_mask]
377 x[case1_mask] = self.ntail(tl, tu)
378 # case 2: lb<ub<-a
379 case2_mask = upper_bounds < -a
380 if np.any(case2_mask):
381 tl = -upper_bounds[case2_mask]
382 tu = -lower_bounds[case2_mask]
383 x[case2_mask] = -self.ntail(tl, tu)
384 # case 3: otherwise use inverse transform or accept-reject
385 case3_mask = ~(case1_mask | case2_mask)
386 if np.any(case3_mask):
387 tl = lower_bounds[case3_mask]
388 tu = upper_bounds[case3_mask]
389 x[case3_mask] = self.tn(tl, tu)
390 return x
392 def tn(self, lower_bounds, upper_bounds, tol=2):
393 # samples a column vector of length=len(lb)=len(ub) from the standard
394 # multivariate normal distribution truncated over the region [lb,ub],
395 # where -a<lb<ub<a for some 'a' and lb and ub are column vectors
396 # uses acceptance rejection and inverse-transform method
398 # controls switch between methods,
399 # threshold can be tuned for maximum speed for each platform
400 sw = tol
401 x = np.empty_like(lower_bounds)
402 # case 1: abs(ub-lb)>tol, uses accept-reject from randn
403 large_interval_mask = abs(upper_bounds - lower_bounds) > sw
404 if np.any(large_interval_mask):
405 tl = lower_bounds[large_interval_mask]
406 tu = upper_bounds[large_interval_mask]
407 x[large_interval_mask] = self.trnd(tl, tu)
409 # case 2: abs(u-l)<tol, uses inverse-transform
410 small_interval_mask = ~large_interval_mask
411 if np.any(small_interval_mask):
412 tl = lower_bounds[small_interval_mask]
413 tu = upper_bounds[small_interval_mask]
414 pl = scipy.special.erfc(tl / np.sqrt(2)) / 2
415 pu = scipy.special.erfc(tu / np.sqrt(2)) / 2
416 x[small_interval_mask] = np.sqrt(2) * scipy.special.erfcinv(
417 2 * (pl - (pl - pu) * self.random_state.rand(len(tl)))
418 )
419 return x
421 def trnd(self, lower_bounds, upper_bounds):
422 # uses acceptance rejection to simulate from truncated normal
423 x = self.random_state.randn(len(lower_bounds)) # sample normal
424 test = (x < lower_bounds) | (x > upper_bounds)
425 rejected_indices = np.where(test)[0]
426 num_rejected = len(rejected_indices)
427 while num_rejected > 0: # while there are rejections
428 ly = lower_bounds[rejected_indices]
429 uy = upper_bounds[rejected_indices]
430 y = self.random_state.randn(len(uy)) # resample
431 accepted_mask = (y > ly) & (y < uy) # accepted
432 x[rejected_indices[accepted_mask]] = y[accepted_mask]
433 rejected_indices = rejected_indices[~accepted_mask]
434 num_rejected = len(rejected_indices)
435 return x
437 def ntail(self, lower_bounds, upper_bounds):
438 # samples a column vector of length=len(lb)=len(ub) from the standard
439 # multivariate normal distribution truncated over the region [lb,ub],
440 # where lb>0 and lb and ub are column vectors
441 # uses acceptance-rejection from Rayleigh distr. similar to Marsaglia (1964)
442 if not len(lower_bounds) == len(upper_bounds):
443 raise RuntimeError(
444 "Lower bound (lb) and upper bound (ub) must be of the same " "length!"
445 )
446 c = (lower_bounds**2) / 2
447 n = len(lower_bounds)
448 f = np.expm1(c - upper_bounds**2 / 2)
449 x = c - np.log(1 + self.random_state.rand(n) * f) # sample using Rayleigh
450 # keep list of rejected
451 rejected_indices = np.where(self.random_state.rand(n) ** 2 * x > c)[0]
452 num_rejected = len(rejected_indices)
453 while num_rejected > 0: # while there are rejections
454 cy = c[rejected_indices]
455 y = cy - np.log(
456 1 + self.random_state.rand(num_rejected) * f[rejected_indices]
457 )
458 accepted_mask = (
459 self.random_state.rand(num_rejected) ** 2 * y
460 ) < cy # accepted
461 x[rejected_indices[accepted_mask]] = y[accepted_mask] # store the accepted
462 rejected_indices = rejected_indices[
463 ~accepted_mask
464 ] # remove accepted from the list
465 num_rejected = len(rejected_indices)
466 return np.sqrt(2 * x) # this Rayleigh transform can be delayed till the end
468 def psy(self, x, mu):
469 # implements psi(x,mu); assumes scaled 'L' without diagonal
470 x = np.append(x, [0.0])
471 mu = np.append(mu, [0.0])
472 c = self.L @ x
473 lt = self.lb - mu - c
474 ut = self.ub - mu - c
475 p = np.sum(lnNormalProb(lt, ut) + 0.5 * mu**2 - x * mu)
476 return p
478 def get_gradient_function(self):
479 # wrapper to avoid dependancy on self
481 def gradpsi(y, chol_matrix, lower_bounds, upper_bounds):
482 # implements gradient of psi(x) to find optimal exponential twisting,
483 # returns also the Jacobian
484 # NOTE: assumes scaled 'L' with zero diagonal
485 d = len(upper_bounds)
486 c = np.zeros(d)
487 mu, x = c.copy(), c.copy()
488 x[0 : d - 1] = y[0 : d - 1]
489 mu[0 : d - 1] = y[d - 1 :]
491 # compute now ~l and ~u
492 c[1:d] = chol_matrix[1:d, :] @ x
493 lt = lower_bounds - mu - c
494 ut = upper_bounds - mu - c
496 # compute gradients avoiding catastrophic cancellation
497 w = lnNormalProb(lt, ut)
498 pl = np.exp(-0.5 * lt**2 - w) / np.sqrt(2 * math.pi)
499 pu = np.exp(-0.5 * ut**2 - w) / np.sqrt(2 * math.pi)
500 P = pl - pu
502 # output the gradient
503 dfdx = -mu[0 : d - 1] + (P.T @ chol_matrix[:, 0 : d - 1]).T
504 dfdm = mu - x + P
505 grad = np.concatenate((dfdx, dfdm[:-1]), axis=0)
507 # construct jacobian
508 lt[np.isinf(lt)] = 0
509 ut[np.isinf(ut)] = 0
511 dP = -(P**2) + lt * pl - ut * pu
512 DL = np.tile(dP.reshape(d, 1), (1, d)) * chol_matrix
513 mx = DL - np.eye(d)
514 xx = chol_matrix.T @ DL
515 mx = mx[:-1, :-1]
516 xx = xx[:-1, :-1]
517 jacobian = np.block([[xx, mx.T], [mx, np.diag(1 + dP[:-1])]])
518 return (grad, jacobian)
520 return gradpsi
522 def colperm(self):
523 perm = np.arange(self.dim)
524 chol_matrix = np.zeros_like(self.cov)
525 z = np.zeros_like(self.orig_mu)
527 for j in perm.copy():
528 pr = np.ones_like(z) * np.inf # compute marginal prob.
529 remaining_dims = np.arange(j, self.dim) # search remaining dimensions
530 D = np.diag(self.cov)
531 s = D[remaining_dims] - np.sum(
532 chol_matrix[remaining_dims, 0:j] ** 2, axis=1
533 )
534 s[s < 0] = self.eps
535 s = np.sqrt(s)
536 tl = (
537 self.lb[remaining_dims] - chol_matrix[remaining_dims, 0:j] @ z[0:j]
538 ) / s
539 tu = (
540 self.ub[remaining_dims] - chol_matrix[remaining_dims, 0:j] @ z[0:j]
541 ) / s
542 pr[remaining_dims] = lnNormalProb(tl, tu)
543 # find smallest marginal dimension
544 k = np.argmin(pr)
546 # flip dimensions k-->j
547 jk = [j, k]
548 kj = [k, j]
549 self.cov[jk, :] = self.cov[kj, :] # update rows of cov
550 self.cov[:, jk] = self.cov[:, kj] # update cols of cov
551 chol_matrix[jk, :] = chol_matrix[kj, :] # update only rows of L
552 self.lb[jk] = self.lb[kj] # update integration limits
553 self.ub[jk] = self.ub[kj] # update integration limits
554 perm[jk] = perm[kj] # keep track of permutation
556 # construct L sequentially via Cholesky computation
557 s = self.cov[j, j] - np.sum(chol_matrix[j, 0:j] ** 2, axis=0)
558 if s < -0.01:
559 raise RuntimeError("Sigma is not positive semi-definite")
560 elif s < 0:
561 s = self.eps
562 chol_matrix[j, j] = np.sqrt(s)
563 new_chol = (
564 self.cov[j + 1 : self.dim, j]
565 - chol_matrix[j + 1 : self.dim, 0:j] @ chol_matrix[j, 0:j].T
566 )
567 chol_matrix[j + 1 : self.dim, j] = new_chol / chol_matrix[j, j]
569 # find mean value, z(j), of truncated normal
570 tl = (self.lb[j] - chol_matrix[j, 0 : j - 1] @ z[0 : j - 1]) / chol_matrix[
571 j, j
572 ]
573 tu = (self.ub[j] - chol_matrix[j, 0 : j - 1] @ z[0 : j - 1]) / chol_matrix[
574 j, j
575 ]
576 w = lnNormalProb(
577 tl, tu
578 ) # aids in computing expected value of trunc. normal
579 z[j] = (np.exp(-0.5 * tl**2 - w) - np.exp(-0.5 * tu**2 - w)) / np.sqrt(
580 2 * math.pi
581 )
582 return chol_matrix, perm
585def lnNormalProb(a, b):
586 # computes ln(P(a<Z<b)) where Z~N(0,1) very accurately for any 'a', 'b'
587 p = np.zeros_like(a)
588 # case b>a>0
589 positive_mask = a > 0
590 if np.any(positive_mask):
591 pa = lnPhi(a[positive_mask])
592 pb = lnPhi(b[positive_mask])
593 p[positive_mask] = pa + np.log1p(-np.exp(pb - pa))
594 # case a<b<0
595 negative_mask = b < 0
596 if np.any(negative_mask):
597 pa = lnPhi(-a[negative_mask]) # log of lower tail
598 pb = lnPhi(-b[negative_mask])
599 p[negative_mask] = pb + np.log1p(-np.exp(pa - pb))
600 # case a < 0 < b
601 mixed_mask = (~positive_mask) & (~negative_mask)
602 if np.any(mixed_mask):
603 pa = scipy.special.erfc(-a[mixed_mask] / np.sqrt(2)) / 2 # lower tail
604 pb = scipy.special.erfc(b[mixed_mask] / np.sqrt(2)) / 2 # upper tail
605 p[mixed_mask] = np.log1p(-pa - pb)
606 return p
609def lnPhi(x):
610 # computes logarithm of tail of Z~N(0,1) mitigating numerical roundoff errors
611 out = (
612 -0.5 * x**2 - np.log(2) + np.log(scipy.special.erfcx(x / np.sqrt(2)) + EPS)
613 ) # divide by zeros error -> add eps
614 return out