Coverage for estats/stats/CrossCLs.py: 77%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2019 ETH Zurich
2# Institute for Particle Physics and Astrophysics
3# Author: Dominik Zuercher
5import numpy as np
6import healpy as hp
9def context():
10 """
11 Defines the paramters used by the plugin
12 """
13 stat_type = 'convergence'
15 required = ['lmax', 'CrossCLs_sliced_bins', 'CrossCLs_min',
16 'CrossCLs_max', 'n_tomo_bins', 'selected_l_range',
17 'CrossCLs_custom', 'bin', 'weight_cls']
18 defaults = [1000, 15, 100, 1000, 4, '0-10000', [], 0, False]
19 types = ['int', 'int', 'int', 'int', 'int', 'str',
20 'list', 'int', 'bool']
21 return required, defaults, types, stat_type
24def CrossCLs(map1, map2, weights1, weights2, ctx):
25 """
26 Calculates the Cross Angular Power spectrum of two convergence maps.
27 :param map1: A Healpix convergence map
28 :param map2: A second Healpix convergence map
29 :param weights1: A Healpix map with pixel weights
30 :param weights2: A second Healpix map with pixel weights
31 :param ctx: Context instance
32 :return: Cross CLs
33 """
34 lmax = ctx['lmax'] - 1
36 # check if weights are set
37 if (not ctx['weight_cls']) | (len(weights1) == 0):
38 weights1_ = np.ones_like(map1, dtype=ctx['prec'])
39 weights1_[np.isclose(weights1, 0.0)] = 0.0
40 else:
41 weights1_ = weights1
42 if (not ctx['weight_cls']) | (len(weights2) == 0):
43 weights2_ = np.ones_like(map2, dtype=ctx['prec'])
44 weights2_[np.isclose(weights2, 0.0)] = 0.0
45 else:
46 weights2_ = weights2
48 # proper weight masking
49 select_seen = weights1_ > 0
50 select_unseen = ~select_seen
51 select_unseen |= map1 == hp.UNSEEN
52 map1_w = (map1 * weights1_).astype(np.float64)
53 map1_w[select_unseen] = hp.UNSEEN
55 select_seen = weights2_ > 0
56 select_unseen = ~select_seen
57 select_unseen |= map2 == hp.UNSEEN
58 map2_w = (map2 * weights2_).astype(np.float64)
59 map2_w[select_unseen] = hp.UNSEEN
61 cls = np.array(hp.sphtfunc.anafast(map1_w, map2_w, lmax=lmax))
62 cls = cls.reshape(1, cls.size)
64 return cls
67def process(data, ctx, scale_to_unity=False):
68 if scale_to_unity:
69 data *= 1e4
70 return data
73def slice(ctx):
74 # number of datavectors for each scale
75 mult = 1
76 # number of scales
77 num_of_scales = 1
78 # either mean or sum, for how to assemble the data into the bins
79 operation = 'mean'
81 if len(ctx['CrossCLs_custom']) > 0:
82 n_bins_sliced = len(ctx['CrossCLs_custom'])
83 else:
84 n_bins_sliced = ctx['CrossCLs_sliced_bins']
86 return num_of_scales, n_bins_sliced, operation, mult
89def decide_binning_scheme(data, meta, bin, ctx):
90 # Perform simple equal width splitting for Cls.
91 if len(ctx['CrossCLs_custom']) > 0:
92 edges = np.asarray(ctx['CrossCLs_custom'])
94 num_of_scales = 1
95 bin_centers = np.zeros((num_of_scales, edges.shape[0]))
96 bin_edge_indices = np.zeros((num_of_scales, edges.shape[0] + 1))
98 lower_edges = edges[:, 0]
99 upper_edges = edges[:, 1]
101 bin_edge_indices[0, :-1] = lower_edges
102 bin_edge_indices[0, -1] = upper_edges[-1]
103 id_larger = bin_edge_indices > ctx['lmax']
104 if np.sum(id_larger) > 1:
105 raise Exception(
106 "Your custom binning scheme requires more multipols than "
107 "given in the data!")
108 elif id_larger[0, -1]:
109 bin_edge_indices[0, -1] = ctx['lmax']
110 bin_centers[0] = bin_edge_indices[0, :-1] \
111 + 0.5 * (bin_edge_indices[0, 1:] - bin_edge_indices[0, :-1])
112 else:
113 range_edges = [ctx['CrossCLs_min'], ctx['CrossCLs_max']]
114 num_of_scales = 1
115 n_bins_sliced = ctx['CrossCLs_sliced_bins']
116 bin_centers = np.zeros((num_of_scales, n_bins_sliced))
117 bin_edge_indices = np.zeros((num_of_scales, n_bins_sliced + 1))
119 # Cut out desired l range
120 minimum = range_edges[0]
121 maximum = range_edges[1]
122 diff = maximum - minimum
124 per_bin = diff // n_bins_sliced
125 remain = diff % n_bins_sliced
126 remain_front = remain // 2
127 remain_back = remain_front + remain % 2
129 # Decide on edge indices
130 bin_edge_indices_temp = np.arange(
131 remain_front + minimum, maximum - remain_back, per_bin)
132 bin_edge_indices_temp[0] -= remain_front
133 bin_edge_indices_temp = np.append(bin_edge_indices_temp, maximum)
135 # Decide on central bin values
136 bin_centers_temp = np.zeros(0)
137 for jj in range(len(bin_edge_indices_temp) - 1):
138 bin_centers_temp = np.append(
139 bin_centers_temp,
140 np.nanmean(bin_edge_indices_temp[jj:jj + 2]))
142 # For consistency with other binning scheme
143 # assign same binning to all scales
144 for scale in range(num_of_scales):
145 bin_centers[scale, :] = bin_centers_temp
146 bin_edge_indices[scale, :] = bin_edge_indices_temp
148 return bin_edge_indices, bin_centers
151def filter(ctx):
152 bin_edge_indices = decide_binning_scheme(None, None, None, ctx)[0][0]
153 if ',' in ctx['selected_l_range']:
154 # assuming list of l ranges. One for each bin combination
155 if ctx['n_tomo_bins'] == 0:
156 raise Exception(
157 "Passed a list of l-ranges for non-tomographic data vector.")
158 range = ctx['selected_l_range'].split(',')[ctx['bin']]
159 else:
160 range = ctx['selected_l_range']
162 lower_edge = int(range.split('-')[0])
163 upper_edge = int(range.split('-')[1])
164 filter = bin_edge_indices[:-1] > lower_edge
165 filter &= bin_edge_indices[1:] < upper_edge
166 return filter