# Copyright (C) 2019 ETH Zurich
# Institute for Particle Physics and Astrophysics
# Author: Dominik Zuercher
import numpy as np
import healpy as hp
[docs]def context():
"""
Defines the paramters used by the plugin
"""
stat_type = 'convergence'
required = ['lmax', 'CrossCLs_sliced_bins', 'CrossCLs_min',
'CrossCLs_max', 'n_tomo_bins', 'selected_l_range',
'CrossCLs_custom', 'bin', 'weight_cls']
defaults = [1000, 15, 100, 1000, 4, '0-10000', [], 0, False]
types = ['int', 'int', 'int', 'int', 'int', 'str',
'list', 'int', 'bool']
return required, defaults, types, stat_type
[docs]def CrossCLs(map1, map2, weights1, weights2, ctx):
"""
Calculates the Cross Angular Power spectrum of two convergence maps.
:param map1: A Healpix convergence map
:param map2: A second Healpix convergence map
:param weights1: A Healpix map with pixel weights
:param weights2: A second Healpix map with pixel weights
:param ctx: Context instance
:return: Cross CLs
"""
lmax = ctx['lmax'] - 1
# check if weights are set
if (not ctx['weight_cls']) | (len(weights1) == 0):
weights1_ = np.ones_like(map1, dtype=ctx['prec'])
weights1_[np.isclose(weights1, 0.0)] = 0.0
else:
weights1_ = weights1
if (not ctx['weight_cls']) | (len(weights2) == 0):
weights2_ = np.ones_like(map2, dtype=ctx['prec'])
weights2_[np.isclose(weights2, 0.0)] = 0.0
else:
weights2_ = weights2
# proper weight masking
select_seen = weights1_ > 0
select_unseen = ~select_seen
select_unseen |= map1 == hp.UNSEEN
map1_w = (map1 * weights1_).astype(np.float64)
map1_w[select_unseen] = hp.UNSEEN
select_seen = weights2_ > 0
select_unseen = ~select_seen
select_unseen |= map2 == hp.UNSEEN
map2_w = (map2 * weights2_).astype(np.float64)
map2_w[select_unseen] = hp.UNSEEN
cls = np.array(hp.sphtfunc.anafast(map1_w, map2_w, lmax=lmax))
cls = cls.reshape(1, cls.size)
return cls
[docs]def process(data, ctx, scale_to_unity=False):
if scale_to_unity:
data *= 1e4
return data
[docs]def slice(ctx):
# number of datavectors for each scale
mult = 1
# number of scales
num_of_scales = 1
# either mean or sum, for how to assemble the data into the bins
operation = 'mean'
if len(ctx['CrossCLs_custom']) > 0:
n_bins_sliced = len(ctx['CrossCLs_custom'])
else:
n_bins_sliced = ctx['CrossCLs_sliced_bins']
return num_of_scales, n_bins_sliced, operation, mult
[docs]def decide_binning_scheme(data, meta, bin, ctx):
# Perform simple equal width splitting for Cls.
if len(ctx['CrossCLs_custom']) > 0:
edges = np.asarray(ctx['CrossCLs_custom'])
num_of_scales = 1
bin_centers = np.zeros((num_of_scales, edges.shape[0]))
bin_edge_indices = np.zeros((num_of_scales, edges.shape[0] + 1))
lower_edges = edges[:, 0]
upper_edges = edges[:, 1]
bin_edge_indices[0, :-1] = lower_edges
bin_edge_indices[0, -1] = upper_edges[-1]
id_larger = bin_edge_indices > ctx['lmax']
if np.sum(id_larger) > 1:
raise Exception(
"Your custom binning scheme requires more multipols than "
"given in the data!")
elif id_larger[0, -1]:
bin_edge_indices[0, -1] = ctx['lmax']
bin_centers[0] = bin_edge_indices[0, :-1] \
+ 0.5 * (bin_edge_indices[0, 1:] - bin_edge_indices[0, :-1])
else:
range_edges = [ctx['CrossCLs_min'], ctx['CrossCLs_max']]
num_of_scales = 1
n_bins_sliced = ctx['CrossCLs_sliced_bins']
bin_centers = np.zeros((num_of_scales, n_bins_sliced))
bin_edge_indices = np.zeros((num_of_scales, n_bins_sliced + 1))
# Cut out desired l range
minimum = range_edges[0]
maximum = range_edges[1]
diff = maximum - minimum
per_bin = diff // n_bins_sliced
remain = diff % n_bins_sliced
remain_front = remain // 2
remain_back = remain_front + remain % 2
# Decide on edge indices
bin_edge_indices_temp = np.arange(
remain_front + minimum, maximum - remain_back, per_bin)
bin_edge_indices_temp[0] -= remain_front
bin_edge_indices_temp = np.append(bin_edge_indices_temp, maximum)
# Decide on central bin values
bin_centers_temp = np.zeros(0)
for jj in range(len(bin_edge_indices_temp) - 1):
bin_centers_temp = np.append(
bin_centers_temp,
np.nanmean(bin_edge_indices_temp[jj:jj + 2]))
# For consistency with other binning scheme
# assign same binning to all scales
for scale in range(num_of_scales):
bin_centers[scale, :] = bin_centers_temp
bin_edge_indices[scale, :] = bin_edge_indices_temp
return bin_edge_indices, bin_centers
[docs]def filter(ctx):
bin_edge_indices = decide_binning_scheme(None, None, None, ctx)[0][0]
if ',' in ctx['selected_l_range']:
# assuming list of l ranges. One for each bin combination
if ctx['n_tomo_bins'] == 0:
raise Exception(
"Passed a list of l-ranges for non-tomographic data vector.")
range = ctx['selected_l_range'].split(',')[ctx['bin']]
else:
range = ctx['selected_l_range']
lower_edge = int(range.split('-')[0])
upper_edge = int(range.split('-')[1])
filter = bin_edge_indices[:-1] > lower_edge
filter &= bin_edge_indices[1:] < upper_edge
return filter