Coverage for estats/stats/CLs.py: 76%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2019 ETH Zurich
2# Institute for Particle Physics and Astrophysics
3# Author: Dominik Zuercher
5import numpy as np
6import healpy as hp
9def context():
10 """
11 Defines the paramters used by the plugin
12 """
13 stat_type = 'convergence'
15 required = ['lmax', 'CLs_sliced_bins', 'CLs_min',
16 'CLs_max', 'n_tomo_bins', 'selected_l_range',
17 'CLs_custom', 'bin', 'weight_cls']
18 defaults = [1000, 15, 100, 1000, 4, '0-10000', [], 0, False]
19 types = ['int', 'int', 'int', 'int', 'int', 'str', 'list', 'int',
20 'bool']
21 return required, defaults, types, stat_type
24def CLs(map, weights, ctx):
25 """
26 Calculates the Angular Power spectrum.
27 :param map: A Healpix convergence map
28 :param weights: A Healpix map with pixel weights
29 :param ctx: Context instance
30 :return: Cross CLs
31 """
32 lmax = ctx['lmax'] - 1
34 # check if weights are set
35 if (not ctx['weight_cls']) | (len(weights) == 0):
36 weights_ = np.ones_like(map, dtype=ctx['prec'])
37 weights_[np.isclose(weights, 0.0)] = 0.0
38 else:
39 weights_ = weights
41 # proper weight masking
42 select_seen = weights_ > 0
43 select_unseen = ~select_seen
44 select_unseen |= map == hp.UNSEEN
45 map_w = (map * weights_).astype(np.float64)
46 map_w[select_unseen] = hp.UNSEEN
48 cls = np.array(hp.sphtfunc.anafast(map_w, map_w, lmax=lmax))
49 cls = cls.reshape(1, cls.size)
51 return cls
54def process(data, ctx, scale_to_unity=False):
55 if scale_to_unity:
56 data *= 1e4
57 return data
60def slice(ctx):
61 # number of datavectors for each scale
62 mult = 1
63 # number of scales
64 num_of_scales = 1
65 # either mean or sum, for how to assemble the data into the bins
66 operation = 'mean'
68 if len(ctx['CLs_custom']) > 0:
69 n_bins_sliced = len(ctx['CLs_custom'])
70 else:
71 n_bins_sliced = ctx['CLs_sliced_bins']
73 return num_of_scales, n_bins_sliced, operation, mult
76def decide_binning_scheme(data, meta, bin, ctx):
77 # Perform simple equal width splitting for Cls.
78 if len(ctx['CLs_custom']) > 0:
79 edges = np.asarray(ctx['CLs_custom'])
81 num_of_scales = 1
82 bin_centers = np.zeros((num_of_scales, edges.shape[0]))
83 bin_edge_indices = np.zeros((num_of_scales, edges.shape[0] + 1))
85 lower_edges = edges[:, 0]
86 upper_edges = edges[:, 1]
88 bin_edge_indices[0, :-1] = lower_edges
89 bin_edge_indices[0, -1] = upper_edges[-1]
90 id_larger = bin_edge_indices > ctx['lmax']
91 if np.sum(id_larger) > 1:
92 raise Exception(
93 "Your custom binning scheme requires more multipols than "
94 "given in the data!")
95 elif id_larger[0, -1]:
96 bin_edge_indices[0, -1] = ctx['lmax']
97 bin_centers[0] = bin_edge_indices[0, :-1] \
98 + 0.5 * (bin_edge_indices[0, 1:] - bin_edge_indices[0, :-1])
99 else:
100 range_edges = [ctx['CLs_min'], ctx['CLs_max']]
101 num_of_scales = 1
102 n_bins_sliced = ctx['CLs_sliced_bins']
103 bin_centers = np.zeros((num_of_scales, n_bins_sliced))
104 bin_edge_indices = np.zeros((num_of_scales, n_bins_sliced + 1))
106 # Cut out desired l range
107 minimum = range_edges[0]
108 maximum = range_edges[1]
109 diff = maximum - minimum
111 per_bin = diff // n_bins_sliced
112 remain = diff % n_bins_sliced
113 remain_front = remain // 2
114 remain_back = remain_front + remain % 2
116 # Decide on edge indices
117 bin_edge_indices_temp = np.arange(
118 remain_front + minimum, maximum - remain_back, per_bin)
119 bin_edge_indices_temp[0] -= remain_front
120 bin_edge_indices_temp = np.append(bin_edge_indices_temp, maximum)
122 # Decide on central bin values
123 bin_centers_temp = np.zeros(0)
124 for jj in range(len(bin_edge_indices_temp) - 1):
125 bin_centers_temp = np.append(
126 bin_centers_temp,
127 np.nanmean(bin_edge_indices_temp[jj:jj + 2]))
129 # For consistency with other binning scheme
130 # assign same binning to all scales
131 for scale in range(num_of_scales):
132 bin_centers[scale, :] = bin_centers_temp
133 bin_edge_indices[scale, :] = bin_edge_indices_temp
135 return bin_edge_indices, bin_centers
138def filter(ctx):
139 bin_edge_indices = decide_binning_scheme(None, None, None, ctx)[0][0]
140 if ',' in ctx['selected_l_range']:
141 # assuming list of l ranges. One for each bin combination
142 if ctx['n_tomo_bins'] == 0:
143 raise Exception(
144 "Passed a list of l-ranges for non-tomographic data vector.")
145 range = ctx['selected_l_range'].split(',')[ctx['bin']]
146 else:
147 range = ctx['selected_l_range']
149 lower_edge = int(range.split('-')[0])
150 upper_edge = int(range.split('-')[1])
151 filter = bin_edge_indices[:-1] > lower_edge
152 filter &= bin_edge_indices[1:] < upper_edge
153 return filter