Coverage for src/ufig/sysmaps_util.py: 93%
129 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 15:17 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 15:17 +0000
1# Copyright (C) 2016 ETH Zurich, Institute for Astronomy
3"""
4Created on Nov 16, 2016
5author: Tomasz Kacprzak
6"""
8import os
10import numba as nb
11import numpy as np
12from astropy.io import fits
13from cosmic_toolbox import logger
14from scipy.special import gamma
16from ufig import io_util
18# from memory_profiler import profile
20LOGGER = logger.get_logger(__file__)
23@nb.jit(nopython=True)
24def chi_mean_loop(detect_image, n_contribute, arange_mu):
25 for i in range(detect_image.shape[0]):
26 for j in range(detect_image.shape[1]):
27 if n_contribute[i, j] == 0: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true
28 detect_image[i, j] = 0.0
29 else:
30 detect_image[i, j] = (
31 np.sqrt(detect_image[i, j]) - arange_mu[n_contribute[i, j]]
32 ) / np.sqrt(n_contribute[i, j] - arange_mu[n_contribute[i, j]] ** 2)
35@nb.jit(nopython=True)
36def stack_detect_image(detect_image, n_contribute, image, weights):
37 for i in range(image.shape[0]):
38 for j in range(image.shape[1]):
39 if (image[i, j] != 0.0) & (weights[i, j] != 0.0): 39 ↛ 38line 39 didn't jump to line 38 because the condition on line 39 was always true
40 detect_image[i, j] += image[i, j] ** 2 * weights[i, j]
41 n_contribute[i, j] += 1.0
44def get_hdf_location_exptime(par, band=None):
45 """
46 Get the filename and dataset for the exposure time map
48 :param par: ctx().parameters structure
49 :param band: which band to use (if None, use the single-band parameter values)
50 :return: filepath, dataset: path and dataset index in hdf file
51 """
53 if par.sysmaps_type == "sysmaps_hdf":
54 if band is None:
55 filepath_h5 = par.exp_time_file_name
56 else:
57 filepath_h5 = par.exp_time_file_name_dict[band]
59 dataset = "data"
61 elif par.sysmaps_type == "sysmaps_hdf_combined":
62 if band is None:
63 filepath_h5 = par.filepath_sysmaps
64 else:
65 filepath_h5 = par.filepath_sysmaps_dict[band]
67 dataset = "map_expt"
69 else:
70 raise ValueError(
71 f"unknown sysmaps_type {par.sysmaps_type}"
72 " (see common.py for available types)"
73 )
75 return filepath_h5, dataset
78def get_hdf_location_bgrms(par, band=None):
79 """
80 Get the filename and dataset for the background noise map
82 :param par: ctx().parameters structure
83 :param band: which band to use (if None, use the single-band parameter values)
84 :return: filepath, dataset: path and dataset index in hdf file
85 """
87 if par.sysmaps_type == "sysmaps_hdf":
88 if band is None:
89 filepath_h5 = par.bkg_rms_file_name
90 else:
91 filepath_h5 = par.bkg_rms_file_name_dict[band]
93 dataset = "data"
95 elif par.sysmaps_type == "sysmaps_hdf_combined":
96 if band is None:
97 filepath_h5 = par.filepath_sysmaps
98 else:
99 filepath_h5 = par.filepath_sysmaps_dict[band]
101 dataset = "map_bsig"
103 else:
104 raise ValueError(
105 f"unknown sysmaps_type {par.sysmaps_type}"
106 " (see common.py for available types)"
107 )
109 return filepath_h5, dataset
112def get_hdf_location_invvar(par, band=None):
113 """
114 Get the filename and dataset for the inverse variance map
116 :param par: ctx().parameters structure
117 :param band: which band to use (if None, use the single-band parameter values)
118 :return: filepath, dataset: path and dataset index in hdf file
119 """
121 if par.sysmaps_type == "sysmaps_hdf":
122 filepath_h5 = par.weight_image if band is None else par.weight_image_dict[band]
123 dataset = "data"
125 elif par.sysmaps_type == "sysmaps_hdf_combined":
126 if band is None:
127 filepath_h5 = par.filepath_sysmaps
128 else:
129 filepath_h5 = par.filepath_sysmaps_dict[band]
131 dataset = "map_invv"
133 else:
134 raise ValueError(
135 f"unknown sysmaps_type {par.sysmaps_type}"
136 " (see common.py for available types)"
137 )
139 return filepath_h5, dataset
142def get_hdf_location_gain(par, band=None):
143 """
144 Get the filename and dataset for the background noise map
146 :param par: ctx().parameters structure
147 :param band: which band to use (if None, use the single-band parameter values)
148 :return: filepath, dataset: path and dataset index in hdf file
149 """
151 if par.sysmaps_type == "sysmaps_hdf":
152 if band is None:
153 filepath_h5 = par.gain_map_file_name
154 else:
155 filepath_h5 = par.gain_map_file_name_dict[band]
157 dataset = "data"
159 elif par.sysmaps_type == "sysmaps_hdf_combined":
160 if band is None:
161 filepath_h5 = par.filepath_sysmaps
162 else:
163 filepath_h5 = par.filepath_sysmaps_dict[band]
165 dataset = "map_gain"
167 else:
168 raise ValueError(
169 f"unknown sysmaps_type {par.sysmaps_type}"
170 " (see common.py for available types)"
171 )
173 return filepath_h5, dataset
176def chi_mean_combination_lowmem(par):
177 """
178 This creates a CHI-MEAN detection image, as is done by the DES pipeline, see
179 https://iopscience.iop.org/article/10.3847/1538-4365/aab4f5/pdf
180 (DOI: 10.3847/1538-4365/aab4f5), appendix B.
182 Copy of chi_mean_combination made to run with low memory
184 :param images_and_weights: iterable yielding images with weights to be combined
185 :return: chi-mean detection image
186 """
188 for i, band in enumerate(par.sextractor_forced_photo_detection_bands):
189 # load image
190 image = io_util.load_image_chunks(
191 file_name=par.image_name_dict[band], ext=0, dtype=np.float32
192 )
194 # load weight map
195 filepath, dataset = get_hdf_location_invvar(par, band=band)
196 weights = io_util.load_from_hdf5(
197 file_name=filepath, hdf5_keys=dataset, root_path=par.maps_remote_dir
198 )
200 if i == 0:
201 detect_image = np.zeros_like(image, dtype=np.float32)
202 n_contribute = np.zeros_like(image, dtype=np.uint8)
204 stack_detect_image(detect_image, n_contribute, image, weights)
205 del image
206 del weights
208 n_contribute_unique = np.arange(50, dtype=np.float32)
209 arange_mu = (
210 np.sqrt(2)
211 * gamma((n_contribute_unique + 1) / 2)
212 / gamma(n_contribute_unique / 2)
213 )
215 chi_mean_loop(detect_image, n_contribute, arange_mu)
217 return detect_image
220def get_path_temp_sextractor_weight(par, band):
221 """
222 Constructs the path to the temporary fits-file with the weights for SExtractor
224 :param par: ctx.parameters
225 :param band: filter band
226 :return: path
227 """
228 dirpath_temp = io_util.get_abs_path(
229 par.tempdir_weight_fits, root_path=par.tempdir_weight_fits, is_file=False
230 )
231 path = os.path.join(dirpath_temp, par.image_name_dict[band] + "__temp_invvar.fits")
232 return path
235def write_temp_sextractor_weight(par, path_out, band=None, dtype=np.float32):
236 """
237 Reads out the weight map (inverse variance) from hdf5 and writes it to fits, s.t. it
238 can be used by SExtractor
240 :param par: ctx.parameters
241 :param path_out: path where weight map will be stored
242 :param band: filter band
243 """
244 filepath, dataset = get_hdf_location_invvar(par, band=band)
245 map_invv_photometry = io_util.load_from_hdf5(
246 file_name=filepath, hdf5_keys=dataset, root_path=par.maps_remote_dir
247 ).astype(dtype)
248 fits.writeto(path_out, map_invv_photometry, overwrite=True)
251# @profile
252def get_detection_image(par):
253 """
254 Constructs the detection image for SExtractor. In case of a single-band detection
255 image, simply writes the weight map of the detection band to a fits file. For
256 multi-band detection, the function computes the CHI-MEAN combination of the
257 detection band images and their weights.
258 :param par: ctx.parameters
259 :return: path to detection image (fits), path to detection weights (fits or 'NONE'),
260 weight type of detection image for SExtractor, either according to input
261 parameters (single-band) or 'NONE' (multi-band detection image)
262 """
264 if len(par.sextractor_forced_photo_detection_bands) == 1:
265 path_detection_image = par.image_name_dict[
266 par.sextractor_forced_photo_detection_bands[0]
267 ]
268 if par.weight_type == "NONE": 268 ↛ 273line 268 didn't jump to line 273 because the condition on line 268 was always true
269 path_detection_weight = "NONE"
270 detection_weight_type = "NONE"
272 else:
273 path_detection_weight = get_path_temp_sextractor_weight(
274 par, par.sextractor_forced_photo_detection_bands[0]
275 )
276 detection_weight_type = par.weight_type
277 if not os.path.isfile(path_detection_weight):
278 write_temp_sextractor_weight(
279 par,
280 path_detection_weight,
281 band=par.sextractor_forced_photo_detection_bands[0],
282 )
284 else:
285 dirpath_temp = io_util.get_abs_path(
286 par.tempdir_weight_fits, root_path=par.tempdir_weight_fits, is_file=False
287 )
288 filename_detection_image = "temp_detection_image_{}.fits".format(
289 "".join(par.sextractor_forced_photo_detection_bands)
290 )
291 path_detection_image = os.path.join(dirpath_temp, filename_detection_image)
292 path_detection_weight = "NONE"
293 detection_weight_type = "NONE"
295 if not os.path.isfile(path_detection_image): 295 ↛ 301line 295 didn't jump to line 301 because the condition on line 295 was always true
296 detection_image = chi_mean_combination_lowmem(par)
297 header = fits.getheader(par.image_name_dict[par.filters[0]])
298 fits.writeto(
299 path_detection_image, detection_image, header=header, overwrite=True
300 )
301 return path_detection_image, path_detection_weight, detection_weight_type
304def write_temp_sextractor_weights(
305 par, dirpath_temp, overwrite_photo=True, overwrite_det=True, dtype=np.float32
306):
307 """
308 Write fits weight maps for SExtractor in the temp folder.
310 :param par: ctx().parameters structure
311 :param dirpath_temp: temp dir where to write files
312 :param overwrite_photo: if to overwrite photometry weight if exists
313 :param overwrite_det: if to overwrite detection weight if exists
315 :return: filepath_fits_photometry, filepath_fits_detection: paths to decompressed
316 weights
317 """
319 # ==========
320 # Photometry
321 # ==========
323 dirpath_temp = io_util.get_abs_path(
324 dirpath_temp, root_path=dirpath_temp, is_file=False
325 )
326 filepath_fits_photometry = os.path.join(
327 dirpath_temp, par.image_name + "__temp_invvar.fits"
328 )
329 filepath, dataset = get_hdf_location_invvar(par)
331 if filepath.endswith(".fits"):
332 filepath_fits_photometry = filepath
334 elif (not os.path.isfile(filepath_fits_photometry)) or overwrite_photo: 334 ↛ 347line 334 didn't jump to line 347 because the condition on line 334 was always true
335 map_invv_photometry = io_util.load_from_hdf5(
336 file_name=filepath, hdf5_keys=dataset, root_path=par.maps_remote_dir
337 )
339 fits.writeto(
340 filepath_fits_photometry, map_invv_photometry.astype(dtype), overwrite=True
341 )
343 # =========
344 # Detection
345 # =========
347 if par.sextractor_use_forced_photo: 347 ↛ 353line 347 didn't jump to line 353 because the condition on line 347 was always true
348 LOGGER.warning(
349 "This line should not be executed, use the force photometry plugin instead"
350 )
351 filepath_fits_detection = None
352 else:
353 filepath_fits_detection = None
355 return filepath_fits_photometry, filepath_fits_detection