Skip to content

utils

multiprobe_framework.utils

rel2realpath(rel_path)

Simple helper function to change the relative path to a real path on user's computer.

Parameters:

Name Type Description Default
rel_path

the relative path to a file in a directory

required

Returns:

Type Description

a real full-path to the file to be read in

Source code in src/multiprobe_framework/utils.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def rel2realpath(rel_path):
    """
    Simple helper function to change the relative path to a real path
    on user's computer.

    :param rel_path: the relative path to a file in a directory

    :return: a real full-path to the file to be read in
    """
    path2script = os.path.dirname(os.path.abspath(__file__))

    # Change the relative path to a real path and resolve the symbolic links
    real_path = os.path.realpath(os.path.join(path2script, rel_path))

    return real_path

load_config(config_path)

Load the configuration file from the given path.

Parameters:

Name Type Description Default
config_path

the path to the configuration file

required

Returns:

Type Description

the configuration file loaded as a dictionary

Source code in src/multiprobe_framework/utils.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def load_config(config_path):
    """
    Load the configuration file from the given path.

    :param config_path: the path to the configuration file

    :return: the configuration file loaded as a dictionary
    """
    # Change the relative path to a real path
    config_path = rel2realpath(config_path)

    # Load the configuration file
    with open(config_path) as f:
        config_loaded = yaml.safe_load(f)

    return config_loaded

get_specinds(spec)

Get indices from the given string, treating 'ka' as a single chunk and applying standard digit handling: - No digits: return chunk[0], chunk[1]. - One digit: append it to the second chunk. - Multiple digits: attach the first digit to chunk[0], the second to chunk[1].

Source code in src/multiprobe_framework/utils.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def get_specinds(spec):
    """
    Get indices from the given string, treating 'ka' as a single chunk
    and applying standard digit handling:
        - No digits: return chunk[0], chunk[1].
        - One digit: append it to the second chunk.
        - Multiple digits: attach the first digit to chunk[0], the second to chunk[1].
    """
    # Collect only letters from the input
    letters_only = re.sub(r"\d+", "", spec)
    # Group 'ka' as a chunk, or else take a single character
    chunks = re.findall(r"(?:ka|.)", letters_only)
    # Extract any digits (e.g., ["12", "34", ...])
    numb = re.findall(r"\d+", spec)

    # Safely grab up to two chunks
    chunk_1 = chunks[0] if len(chunks) > 0 else ""
    chunk_2 = chunks[1] if len(chunks) > 1 else ""

    # Apply digit logic
    if not numb:
        # No digits found
        return chunk_1, chunk_2
    elif len(numb[0]) == 1:
        # One digit (append to second chunk)
        return chunk_1, chunk_2 + numb[0]
    else:
        # Multiple digits: attach first to chunk_1, second to chunk_2
        return chunk_1 + numb[0][0], chunk_2 + numb[0][1]

update_npz_file(file_path, update_dict)

Update an existing npz file with new data.

Parameters: file_path (str): Path to the npz file to be updated. update_dict (dict): Dictionary with data to update. Keys are the names of the arrays.

Source code in src/multiprobe_framework/utils.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def update_npz_file(file_path, update_dict):
    """
    Update an existing npz file with new data.

    Parameters:
    file_path (str): Path to the npz file to be updated.
    update_dict (dict): Dictionary with data to update. Keys are the names of
                        the arrays.
    """
    # Load the existing data
    data = np.load(file_path)
    data_dict = {key: data[key] for key in data}

    # Update with new data
    data_dict.update(update_dict)

    return data_dict

check_and_load_npz_file(scratch_directory, index, prefix='cls')

Wait for an NPZ file to exist and then load it.

Parameters:

Name Type Description Default
scratch_directory

the directory where the NPZ file is stored

required
index

the index of the NPZ file

required

Returns:

Type Description

the loaded NPZ file

Source code in src/multiprobe_framework/utils.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def check_and_load_npz_file(scratch_directory, index, prefix="cls"):
    """
    Wait for an NPZ file to exist and then load it.

    :param scratch_directory: the directory where the NPZ file is stored
    :param index: the index of the NPZ file

    :return: the loaded NPZ file
    """

    npz_file_path = f"{scratch_directory}/{prefix}_{index}.npz"

    # see if the file exists, if not wait 30s and check again if still not then

    if not os.path.isfile(npz_file_path):
        LOGGER.info(f"Waiting for file: {npz_file_path}")
        time.sleep(15)
        if not os.path.isfile(npz_file_path):
            LOGGER.error(f"File: {npz_file_path} does not exist")
            raise FileNotFoundError(f"File: {npz_file_path} does not exist")
        else:
            LOGGER.info(f"File: {npz_file_path} exists now")

    return np.load(npz_file_path)

ensure_dataset_exists(f, spec, num_reals, n_bin=None, dtype=np.float64, compression='lzf')

Ensure that the HDF5 dataset exists, creating it if necessary.

Parameters:

Name Type Description Default
f

the HDF5 file object

required
spec

the name of the dataset

required
num_reals

the number of realizations

required
n_bin

the number of bins

None
dtype

the datatype of the dataset

float64
compression

the compression algorithm to use

'lzf'

Returns:

Type Description

None

Source code in src/multiprobe_framework/utils.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def ensure_dataset_exists(
    f, spec, num_reals, n_bin=None, dtype=np.float64, compression="lzf"
):
    """
    Ensure that the HDF5 dataset exists, creating it if necessary.

    :param f: the HDF5 file object
    :param spec: the name of the dataset
    :param num_reals: the number of realizations
    :param n_bin: the number of bins
    :param dtype: the datatype of the dataset
    :param compression: the compression algorithm to use

    :return: None
    """
    if spec not in f:
        if num_reals is not None:
            shape = (num_reals, n_bin)
            maxshape = (None, None)  # Allow dataset to grow indefinitely
        else:
            shape = (n_bin,)
            maxshape = (None,)
        f.create_dataset(
            name=spec,
            shape=shape,
            maxshape=maxshape,
            dtype=dtype,
            compression=compression,
        )
    else:
        LOGGER.info(f"Dataset {spec} already exists.")

save_data_to_hdf5(f, spec, data, index)

Save the data to the specified index of the dataset.

Source code in src/multiprobe_framework/utils.py
178
179
180
181
182
183
184
185
186
def save_data_to_hdf5(f, spec, data, index):
    """Save the data to the specified index of the dataset."""
    try:
        if index is not None:
            f[spec][index] = data
        else:
            f[spec][:] = data
    except Exception as e:
        LOGGER.error(f"Error saving data for {spec} at index {index}: {e}")

get_spin(maps)

Simply use the number of maps in the list passed to determine the spin of the fields.

Parameters:

Name Type Description Default
maps

a list of maps

required

Returns:

Type Description

spin value of maps

Source code in src/multiprobe_framework/utils.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def get_spin(maps):
    """
    Simply use the number of maps in the list passed to determine the spin of
    the fields.

    :param maps: a list of maps

    :return: spin value of maps
    """
    print("length of maps", len(maps))

    if len(maps) == 1:
        spin = 0

    elif len(maps) == 2:
        spin = 2

    else:
        raise ValueError("Pass either spin 0 or spin 2 maps.")

    return spin

check_specind_exists(specind)

Check if the given specind exists in the pipeline.

Source code in src/multiprobe_framework/utils.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def check_specind_exists(specind):
    """
    Check if the given specind exists in the pipeline.
    """
    specind_list = [
        "t",
        "k",
        "e",
        "g1",
        "g2",
        "g3",
        "g4",
        "g5",
        "c",
        "l",
        "d1",
        "d2",
        "d3",
        "d4",
        "ka",
    ]
    if specind not in specind_list:
        raise ValueError(
            f"Invalid specind: {specind}, the available specinds currently "
            f"implemented are: {specind_list}"
        )

get_compression_vecs_moped(derivative_dict, params, inv_cov, ad=False)

This function computes the MOPED compression vectors given some inputs.

:inv_cov: best estimate of the inverse covariance matrix of the

Parameters:

Name Type Description Default
derivative_dict

dictionary of derivative values of Cl with respect to each parameter

required
params

parameters that are input to the theory code calculation

required

Returns:

Type Description

dictionary of compression vectors for each of the input parameters

Source code in src/multiprobe_framework/utils.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def get_compression_vecs_moped(derivative_dict, params, inv_cov, ad=False):
    """
    This function computes the MOPED compression vectors given some inputs.

    :param derivative_dict: dictionary of derivative values of Cl with respect
                            to each parameter
    :param params: parameters that are input to the theory code calculation
    :inv_cov: best estimate of the inverse covariance matrix of the

    :return: dictionary of compression vectors for each of the input parameters
    """

    compression_vecs = {}

    for i, param in enumerate(params):
        dCl = derivative_dict[param]
        dCl = np.array(dCl)  # convert to numpy array
        num = inv_cov.dot(dCl)
        denom_sq = dCl.dot(inv_cov.dot(dCl))

        if i == 0:
            compression_vecs[param] = num / np.sqrt(denom_sq)

        else:
            num_sum = 0
            denom_sum = 0
            for q in range(i):
                p_q = params[q]
                num_sum += dCl.dot(compression_vecs[p_q]) * compression_vecs[p_q]
                denom_sum += (dCl.dot(compression_vecs[p_q])) ** 2

            compression_vecs[param] = (num - num_sum) / np.sqrt(denom_sq - denom_sum)

    return compression_vecs

create_derivative_dict(func, params, paramnames)

Given an input JAX function, create a dictionary of derivates at the input fiducial parameters.

Parameters:

Name Type Description Default
func

JAX function

required
params

input parameters

required
Source code in src/multiprobe_framework/utils.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def create_derivative_dict(func, params, paramnames):
    """
    Given an input JAX function, create a dictionary of derivates at the input
    fiducial parameters.

    :param func: JAX function
    :param params: input parameters
    """

    derivative_func = jax.jacfwd(func)
    derivative_dict = {}

    jacobian = derivative_func(params)

    for param in paramnames:
        derivative_dict[param] = jacobian[:, 0, 0, paramnames.index(param)]

    return derivative_dict