Coverage for estats/stats/CrossCLs.py: 77%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

93 statements  

1# Copyright (C) 2019 ETH Zurich 

2# Institute for Particle Physics and Astrophysics 

3# Author: Dominik Zuercher 

4 

5import numpy as np 

6import healpy as hp 

7 

8 

9def context(): 

10 """ 

11 Defines the paramters used by the plugin 

12 """ 

13 stat_type = 'convergence' 

14 

15 required = ['lmax', 'CrossCLs_sliced_bins', 'CrossCLs_min', 

16 'CrossCLs_max', 'n_tomo_bins', 'selected_l_range', 

17 'CrossCLs_custom', 'bin', 'weight_cls'] 

18 defaults = [1000, 15, 100, 1000, 4, '0-10000', [], 0, False] 

19 types = ['int', 'int', 'int', 'int', 'int', 'str', 

20 'list', 'int', 'bool'] 

21 return required, defaults, types, stat_type 

22 

23 

24def CrossCLs(map1, map2, weights1, weights2, ctx): 

25 """ 

26 Calculates the Cross Angular Power spectrum of two convergence maps. 

27 :param map1: A Healpix convergence map 

28 :param map2: A second Healpix convergence map 

29 :param weights1: A Healpix map with pixel weights 

30 :param weights2: A second Healpix map with pixel weights 

31 :param ctx: Context instance 

32 :return: Cross CLs 

33 """ 

34 lmax = ctx['lmax'] - 1 

35 

36 # check if weights are set 

37 if (not ctx['weight_cls']) | (len(weights1) == 0): 

38 weights1_ = np.ones_like(map1, dtype=ctx['prec']) 

39 weights1_[np.isclose(weights1, 0.0)] = 0.0 

40 else: 

41 weights1_ = weights1 

42 if (not ctx['weight_cls']) | (len(weights2) == 0): 

43 weights2_ = np.ones_like(map2, dtype=ctx['prec']) 

44 weights2_[np.isclose(weights2, 0.0)] = 0.0 

45 else: 

46 weights2_ = weights2 

47 

48 # proper weight masking 

49 select_seen = weights1_ > 0 

50 select_unseen = ~select_seen 

51 select_unseen |= map1 == hp.UNSEEN 

52 map1_w = (map1 * weights1_).astype(np.float64) 

53 map1_w[select_unseen] = hp.UNSEEN 

54 

55 select_seen = weights2_ > 0 

56 select_unseen = ~select_seen 

57 select_unseen |= map2 == hp.UNSEEN 

58 map2_w = (map2 * weights2_).astype(np.float64) 

59 map2_w[select_unseen] = hp.UNSEEN 

60 

61 cls = np.array(hp.sphtfunc.anafast(map1_w, map2_w, lmax=lmax)) 

62 cls = cls.reshape(1, cls.size) 

63 

64 return cls 

65 

66 

67def process(data, ctx, scale_to_unity=False): 

68 if scale_to_unity: 

69 data *= 1e4 

70 return data 

71 

72 

73def slice(ctx): 

74 # number of datavectors for each scale 

75 mult = 1 

76 # number of scales 

77 num_of_scales = 1 

78 # either mean or sum, for how to assemble the data into the bins 

79 operation = 'mean' 

80 

81 if len(ctx['CrossCLs_custom']) > 0: 

82 n_bins_sliced = len(ctx['CrossCLs_custom']) 

83 else: 

84 n_bins_sliced = ctx['CrossCLs_sliced_bins'] 

85 

86 return num_of_scales, n_bins_sliced, operation, mult 

87 

88 

89def decide_binning_scheme(data, meta, bin, ctx): 

90 # Perform simple equal width splitting for Cls. 

91 if len(ctx['CrossCLs_custom']) > 0: 

92 edges = np.asarray(ctx['CrossCLs_custom']) 

93 

94 num_of_scales = 1 

95 bin_centers = np.zeros((num_of_scales, edges.shape[0])) 

96 bin_edge_indices = np.zeros((num_of_scales, edges.shape[0] + 1)) 

97 

98 lower_edges = edges[:, 0] 

99 upper_edges = edges[:, 1] 

100 

101 bin_edge_indices[0, :-1] = lower_edges 

102 bin_edge_indices[0, -1] = upper_edges[-1] 

103 id_larger = bin_edge_indices > ctx['lmax'] 

104 if np.sum(id_larger) > 1: 

105 raise Exception( 

106 "Your custom binning scheme requires more multipols than " 

107 "given in the data!") 

108 elif id_larger[0, -1]: 

109 bin_edge_indices[0, -1] = ctx['lmax'] 

110 bin_centers[0] = bin_edge_indices[0, :-1] \ 

111 + 0.5 * (bin_edge_indices[0, 1:] - bin_edge_indices[0, :-1]) 

112 else: 

113 range_edges = [ctx['CrossCLs_min'], ctx['CrossCLs_max']] 

114 num_of_scales = 1 

115 n_bins_sliced = ctx['CrossCLs_sliced_bins'] 

116 bin_centers = np.zeros((num_of_scales, n_bins_sliced)) 

117 bin_edge_indices = np.zeros((num_of_scales, n_bins_sliced + 1)) 

118 

119 # Cut out desired l range 

120 minimum = range_edges[0] 

121 maximum = range_edges[1] 

122 diff = maximum - minimum 

123 

124 per_bin = diff // n_bins_sliced 

125 remain = diff % n_bins_sliced 

126 remain_front = remain // 2 

127 remain_back = remain_front + remain % 2 

128 

129 # Decide on edge indices 

130 bin_edge_indices_temp = np.arange( 

131 remain_front + minimum, maximum - remain_back, per_bin) 

132 bin_edge_indices_temp[0] -= remain_front 

133 bin_edge_indices_temp = np.append(bin_edge_indices_temp, maximum) 

134 

135 # Decide on central bin values 

136 bin_centers_temp = np.zeros(0) 

137 for jj in range(len(bin_edge_indices_temp) - 1): 

138 bin_centers_temp = np.append( 

139 bin_centers_temp, 

140 np.nanmean(bin_edge_indices_temp[jj:jj + 2])) 

141 

142 # For consistency with other binning scheme 

143 # assign same binning to all scales 

144 for scale in range(num_of_scales): 

145 bin_centers[scale, :] = bin_centers_temp 

146 bin_edge_indices[scale, :] = bin_edge_indices_temp 

147 

148 return bin_edge_indices, bin_centers 

149 

150 

151def filter(ctx): 

152 bin_edge_indices = decide_binning_scheme(None, None, None, ctx)[0][0] 

153 if ',' in ctx['selected_l_range']: 

154 # assuming list of l ranges. One for each bin combination 

155 if ctx['n_tomo_bins'] == 0: 

156 raise Exception( 

157 "Passed a list of l-ranges for non-tomographic data vector.") 

158 range = ctx['selected_l_range'].split(',')[ctx['bin']] 

159 else: 

160 range = ctx['selected_l_range'] 

161 

162 lower_edge = int(range.split('-')[0]) 

163 upper_edge = int(range.split('-')[1]) 

164 filter = bin_edge_indices[:-1] > lower_edge 

165 filter &= bin_edge_indices[1:] < upper_edge 

166 return filter