Coverage for estats/stats/CLs.py: 76%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

84 statements  

1# Copyright (C) 2019 ETH Zurich 

2# Institute for Particle Physics and Astrophysics 

3# Author: Dominik Zuercher 

4 

5import numpy as np 

6import healpy as hp 

7 

8 

9def context(): 

10 """ 

11 Defines the paramters used by the plugin 

12 """ 

13 stat_type = 'convergence' 

14 

15 required = ['lmax', 'CLs_sliced_bins', 'CLs_min', 

16 'CLs_max', 'n_tomo_bins', 'selected_l_range', 

17 'CLs_custom', 'bin', 'weight_cls'] 

18 defaults = [1000, 15, 100, 1000, 4, '0-10000', [], 0, False] 

19 types = ['int', 'int', 'int', 'int', 'int', 'str', 'list', 'int', 

20 'bool'] 

21 return required, defaults, types, stat_type 

22 

23 

24def CLs(map, weights, ctx): 

25 """ 

26 Calculates the Angular Power spectrum. 

27 :param map: A Healpix convergence map 

28 :param weights: A Healpix map with pixel weights 

29 :param ctx: Context instance 

30 :return: Cross CLs 

31 """ 

32 lmax = ctx['lmax'] - 1 

33 

34 # check if weights are set 

35 if (not ctx['weight_cls']) | (len(weights) == 0): 

36 weights_ = np.ones_like(map, dtype=ctx['prec']) 

37 weights_[np.isclose(weights, 0.0)] = 0.0 

38 else: 

39 weights_ = weights 

40 

41 # proper weight masking 

42 select_seen = weights_ > 0 

43 select_unseen = ~select_seen 

44 select_unseen |= map == hp.UNSEEN 

45 map_w = (map * weights_).astype(np.float64) 

46 map_w[select_unseen] = hp.UNSEEN 

47 

48 cls = np.array(hp.sphtfunc.anafast(map_w, map_w, lmax=lmax)) 

49 cls = cls.reshape(1, cls.size) 

50 

51 return cls 

52 

53 

54def process(data, ctx, scale_to_unity=False): 

55 if scale_to_unity: 

56 data *= 1e4 

57 return data 

58 

59 

60def slice(ctx): 

61 # number of datavectors for each scale 

62 mult = 1 

63 # number of scales 

64 num_of_scales = 1 

65 # either mean or sum, for how to assemble the data into the bins 

66 operation = 'mean' 

67 

68 if len(ctx['CLs_custom']) > 0: 

69 n_bins_sliced = len(ctx['CLs_custom']) 

70 else: 

71 n_bins_sliced = ctx['CLs_sliced_bins'] 

72 

73 return num_of_scales, n_bins_sliced, operation, mult 

74 

75 

76def decide_binning_scheme(data, meta, bin, ctx): 

77 # Perform simple equal width splitting for Cls. 

78 if len(ctx['CLs_custom']) > 0: 

79 edges = np.asarray(ctx['CLs_custom']) 

80 

81 num_of_scales = 1 

82 bin_centers = np.zeros((num_of_scales, edges.shape[0])) 

83 bin_edge_indices = np.zeros((num_of_scales, edges.shape[0] + 1)) 

84 

85 lower_edges = edges[:, 0] 

86 upper_edges = edges[:, 1] 

87 

88 bin_edge_indices[0, :-1] = lower_edges 

89 bin_edge_indices[0, -1] = upper_edges[-1] 

90 id_larger = bin_edge_indices > ctx['lmax'] 

91 if np.sum(id_larger) > 1: 

92 raise Exception( 

93 "Your custom binning scheme requires more multipols than " 

94 "given in the data!") 

95 elif id_larger[0, -1]: 

96 bin_edge_indices[0, -1] = ctx['lmax'] 

97 bin_centers[0] = bin_edge_indices[0, :-1] \ 

98 + 0.5 * (bin_edge_indices[0, 1:] - bin_edge_indices[0, :-1]) 

99 else: 

100 range_edges = [ctx['CLs_min'], ctx['CLs_max']] 

101 num_of_scales = 1 

102 n_bins_sliced = ctx['CLs_sliced_bins'] 

103 bin_centers = np.zeros((num_of_scales, n_bins_sliced)) 

104 bin_edge_indices = np.zeros((num_of_scales, n_bins_sliced + 1)) 

105 

106 # Cut out desired l range 

107 minimum = range_edges[0] 

108 maximum = range_edges[1] 

109 diff = maximum - minimum 

110 

111 per_bin = diff // n_bins_sliced 

112 remain = diff % n_bins_sliced 

113 remain_front = remain // 2 

114 remain_back = remain_front + remain % 2 

115 

116 # Decide on edge indices 

117 bin_edge_indices_temp = np.arange( 

118 remain_front + minimum, maximum - remain_back, per_bin) 

119 bin_edge_indices_temp[0] -= remain_front 

120 bin_edge_indices_temp = np.append(bin_edge_indices_temp, maximum) 

121 

122 # Decide on central bin values 

123 bin_centers_temp = np.zeros(0) 

124 for jj in range(len(bin_edge_indices_temp) - 1): 

125 bin_centers_temp = np.append( 

126 bin_centers_temp, 

127 np.nanmean(bin_edge_indices_temp[jj:jj + 2])) 

128 

129 # For consistency with other binning scheme 

130 # assign same binning to all scales 

131 for scale in range(num_of_scales): 

132 bin_centers[scale, :] = bin_centers_temp 

133 bin_edge_indices[scale, :] = bin_edge_indices_temp 

134 

135 return bin_edge_indices, bin_centers 

136 

137 

138def filter(ctx): 

139 bin_edge_indices = decide_binning_scheme(None, None, None, ctx)[0][0] 

140 if ',' in ctx['selected_l_range']: 

141 # assuming list of l ranges. One for each bin combination 

142 if ctx['n_tomo_bins'] == 0: 

143 raise Exception( 

144 "Passed a list of l-ranges for non-tomographic data vector.") 

145 range = ctx['selected_l_range'].split(',')[ctx['bin']] 

146 else: 

147 range = ctx['selected_l_range'] 

148 

149 lower_edge = int(range.split('-')[0]) 

150 upper_edge = int(range.split('-')[1]) 

151 filter = bin_edge_indices[:-1] > lower_edge 

152 filter &= bin_edge_indices[1:] < upper_edge 

153 return filter