Coverage for src/ufig/sysmaps_util.py: 93%
129 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 19:08 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 19:08 +0000
1# Copyright (C) 2016 ETH Zurich, Institute for Astronomy
3"""
4Created on Nov 16, 2016
5author: Tomasz Kacprzak
6"""
8import os
10import numba as nb
11import numpy as np
12from astropy.io import fits
13from cosmic_toolbox import logger
14from scipy.special import gamma
16from ufig import io_util
18# from memory_profiler import profile
20LOGGER = logger.get_logger(__file__)
23@nb.jit(nopython=True)
24def chi_mean_loop(detect_image, n_contribute, arange_mu):
25 for i in range(detect_image.shape[0]):
26 for j in range(detect_image.shape[1]):
27 if n_contribute[i, j] == 0: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true
28 detect_image[i, j] = 0.0
29 else:
30 detect_image[i, j] = (
31 np.sqrt(detect_image[i, j]) - arange_mu[n_contribute[i, j]]
32 ) / np.sqrt(n_contribute[i, j] - arange_mu[n_contribute[i, j]] ** 2)
35@nb.jit(nopython=True)
36def stack_detect_image(detect_image, n_contribute, image, weights):
37 for i in range(image.shape[0]):
38 for j in range(image.shape[1]):
39 if (image[i, j] != 0.0) & (weights[i, j] != 0.0): 39 ↛ 38line 39 didn't jump to line 38 because the condition on line 39 was always true
40 detect_image[i, j] += image[i, j] ** 2 * weights[i, j]
41 n_contribute[i, j] += 1.0
44def get_hdf_location_exptime(par, band=None):
45 """
46 Get the filename and dataset for the exposure time map
47 :param par: ctx().parameters structure
48 :param band: which band to use (if None, use the single-band parameter values)
49 :return: filepath, dataset: path and dataset index in hdf file
50 """
52 if par.sysmaps_type == "sysmaps_hdf":
53 if band is None:
54 filepath_h5 = par.exp_time_file_name
55 else:
56 filepath_h5 = par.exp_time_file_name_dict[band]
58 dataset = "data"
60 elif par.sysmaps_type == "sysmaps_hdf_combined":
61 if band is None:
62 filepath_h5 = par.filepath_sysmaps
63 else:
64 filepath_h5 = par.filepath_sysmaps_dict[band]
66 dataset = "map_expt"
68 else:
69 raise ValueError(
70 f"unknown sysmaps_type {par.sysmaps_type}"
71 " (see common.py for available types)"
72 )
74 return filepath_h5, dataset
77def get_hdf_location_bgrms(par, band=None):
78 """
79 Get the filename and dataset for the background noise map
80 :param par: ctx().parameters structure
81 :param band: which band to use (if None, use the single-band parameter values)
82 :return: filepath, dataset: path and dataset index in hdf file
83 """
85 if par.sysmaps_type == "sysmaps_hdf":
86 if band is None:
87 filepath_h5 = par.bkg_rms_file_name
88 else:
89 filepath_h5 = par.bkg_rms_file_name_dict[band]
91 dataset = "data"
93 elif par.sysmaps_type == "sysmaps_hdf_combined":
94 if band is None:
95 filepath_h5 = par.filepath_sysmaps
96 else:
97 filepath_h5 = par.filepath_sysmaps_dict[band]
99 dataset = "map_bsig"
101 else:
102 raise ValueError(
103 f"unknown sysmaps_type {par.sysmaps_type}"
104 " (see common.py for available types)"
105 )
107 return filepath_h5, dataset
110def get_hdf_location_invvar(par, band=None):
111 """
112 Get the filename and dataset for the inverse variance map
113 :param par: ctx().parameters structure
114 :param band: which band to use (if None, use the single-band parameter values)
115 :return: filepath, dataset: path and dataset index in hdf file
116 """
118 if par.sysmaps_type == "sysmaps_hdf":
119 filepath_h5 = par.weight_image if band is None else par.weight_image_dict[band]
120 dataset = "data"
122 elif par.sysmaps_type == "sysmaps_hdf_combined":
123 if band is None:
124 filepath_h5 = par.filepath_sysmaps
125 else:
126 filepath_h5 = par.filepath_sysmaps_dict[band]
128 dataset = "map_invv"
130 else:
131 raise ValueError(
132 f"unknown sysmaps_type {par.sysmaps_type}"
133 " (see common.py for available types)"
134 )
136 return filepath_h5, dataset
139def get_hdf_location_gain(par, band=None):
140 """
141 Get the filename and dataset for the background noise map
142 :param par: ctx().parameters structure
143 :param band: which band to use (if None, use the single-band parameter values)
144 :return: filepath, dataset: path and dataset index in hdf file
145 """
147 if par.sysmaps_type == "sysmaps_hdf":
148 if band is None:
149 filepath_h5 = par.gain_map_file_name
150 else:
151 filepath_h5 = par.gain_map_file_name_dict[band]
153 dataset = "data"
155 elif par.sysmaps_type == "sysmaps_hdf_combined":
156 if band is None:
157 filepath_h5 = par.filepath_sysmaps
158 else:
159 filepath_h5 = par.filepath_sysmaps_dict[band]
161 dataset = "map_gain"
163 else:
164 raise ValueError(
165 f"unknown sysmaps_type {par.sysmaps_type}"
166 " (see common.py for available types)"
167 )
169 return filepath_h5, dataset
172def chi_mean_combination_lowmem(par):
173 """
174 This creates a CHI-MEAN detection image, as is done by the DES pipeline, see
175 https://iopscience.iop.org/article/10.3847/1538-4365/aab4f5/pdf
176 (DOI: 10.3847/1538-4365/aab4f5), appendix B.
178 Copy of chi_mean_combination made to run with low memory
180 :param images_and_weights: iterable yielding images with weights to be combined
181 :return: chi-mean detection image
182 """
184 for i, band in enumerate(par.sextractor_forced_photo_detection_bands):
185 # load image
186 image = io_util.load_image_chunks(
187 file_name=par.image_name_dict[band], ext=0, dtype=np.float32
188 )
190 # load weight map
191 filepath, dataset = get_hdf_location_invvar(par, band=band)
192 weights = io_util.load_from_hdf5(
193 file_name=filepath, hdf5_keys=dataset, root_path=par.maps_remote_dir
194 )
196 if i == 0:
197 detect_image = np.zeros_like(image, dtype=np.float32)
198 n_contribute = np.zeros_like(image, dtype=np.uint8)
200 stack_detect_image(detect_image, n_contribute, image, weights)
201 del image
202 del weights
204 n_contribute_unique = np.arange(50, dtype=np.float32)
205 arange_mu = (
206 np.sqrt(2)
207 * gamma((n_contribute_unique + 1) / 2)
208 / gamma(n_contribute_unique / 2)
209 )
211 chi_mean_loop(detect_image, n_contribute, arange_mu)
213 return detect_image
216def get_path_temp_sextractor_weight(par, band):
217 """
218 Constructs the path to the temporary fits-file with the weights for SExtractor
219 :param par: ctx.parameters
220 :param band: filter band
221 :return: path
222 """
223 dirpath_temp = io_util.get_abs_path(
224 par.tempdir_weight_fits, root_path=par.tempdir_weight_fits, is_file=False
225 )
226 path = os.path.join(dirpath_temp, par.image_name_dict[band] + "__temp_invvar.fits")
227 return path
230def write_temp_sextractor_weight(par, path_out, band=None, dtype=np.float32):
231 """
232 Reads out the weight map (inverse variance) from hdf5 and writes it to fits, s.t. it
233 can be used by SExtractor
234 :param par: ctx.parameters
235 :param path_out: path where weight map will be stored
236 :param band: filter band
237 """
238 filepath, dataset = get_hdf_location_invvar(par, band=band)
239 map_invv_photometry = io_util.load_from_hdf5(
240 file_name=filepath, hdf5_keys=dataset, root_path=par.maps_remote_dir
241 ).astype(dtype)
242 fits.writeto(path_out, map_invv_photometry, overwrite=True)
245# @profile
246def get_detection_image(par):
247 """
248 Constructs the detection image for SExtractor. In case of a single-band detection
249 image, simply writes the weight map of the detection band to a fits file. For
250 multi-band detection, the function computes the CHI-MEAN combination of the
251 detection band images and their weights.
252 :param par: ctx.parameters
253 :return: path to detection image (fits), path to detection weights (fits or 'NONE'),
254 weight type of detection image for SExtractor, either according to input
255 parameters (single-band) or 'NONE' (multi-band detection image)
256 """
258 if len(par.sextractor_forced_photo_detection_bands) == 1:
259 path_detection_image = par.image_name_dict[
260 par.sextractor_forced_photo_detection_bands[0]
261 ]
262 if par.weight_type == "NONE": 262 ↛ 267line 262 didn't jump to line 267 because the condition on line 262 was always true
263 path_detection_weight = "NONE"
264 detection_weight_type = "NONE"
266 else:
267 path_detection_weight = get_path_temp_sextractor_weight(
268 par, par.sextractor_forced_photo_detection_bands[0]
269 )
270 detection_weight_type = par.weight_type
271 if not os.path.isfile(path_detection_weight):
272 write_temp_sextractor_weight(
273 par,
274 path_detection_weight,
275 band=par.sextractor_forced_photo_detection_bands[0],
276 )
278 else:
279 dirpath_temp = io_util.get_abs_path(
280 par.tempdir_weight_fits, root_path=par.tempdir_weight_fits, is_file=False
281 )
282 filename_detection_image = "temp_detection_image_{}.fits".format(
283 "".join(par.sextractor_forced_photo_detection_bands)
284 )
285 path_detection_image = os.path.join(dirpath_temp, filename_detection_image)
286 path_detection_weight = "NONE"
287 detection_weight_type = "NONE"
289 if not os.path.isfile(path_detection_image): 289 ↛ 295line 289 didn't jump to line 295 because the condition on line 289 was always true
290 detection_image = chi_mean_combination_lowmem(par)
291 header = fits.getheader(par.image_name_dict[par.filters[0]])
292 fits.writeto(
293 path_detection_image, detection_image, header=header, overwrite=True
294 )
295 return path_detection_image, path_detection_weight, detection_weight_type
298def write_temp_sextractor_weights(
299 par, dirpath_temp, overwrite_photo=True, overwrite_det=True, dtype=np.float32
300):
301 """
302 Write fits weight maps for SExtractor in the temp folder.
304 :param par: ctx().parameters structure
305 :param dirpath_temp: temp dir where to write files
306 :param overwrite_photo: if to overwrite photometry weight if exists
307 :param overwrite_det: if to overwrite detection weight if exists
309 :return: filepath_fits_photometry, filepath_fits_detection: paths to decompressed
310 weights
311 """
313 # ==========
314 # Photometry
315 # ==========
317 dirpath_temp = io_util.get_abs_path(
318 dirpath_temp, root_path=dirpath_temp, is_file=False
319 )
320 filepath_fits_photometry = os.path.join(
321 dirpath_temp, par.image_name + "__temp_invvar.fits"
322 )
323 filepath, dataset = get_hdf_location_invvar(par)
325 if filepath.endswith(".fits"):
326 filepath_fits_photometry = filepath
328 elif (not os.path.isfile(filepath_fits_photometry)) or overwrite_photo: 328 ↛ 341line 328 didn't jump to line 341 because the condition on line 328 was always true
329 map_invv_photometry = io_util.load_from_hdf5(
330 file_name=filepath, hdf5_keys=dataset, root_path=par.maps_remote_dir
331 )
333 fits.writeto(
334 filepath_fits_photometry, map_invv_photometry.astype(dtype), overwrite=True
335 )
337 # =========
338 # Detection
339 # =========
341 if par.sextractor_use_forced_photo: 341 ↛ 347line 341 didn't jump to line 347 because the condition on line 341 was always true
342 LOGGER.warning(
343 "This line should not be executed, use the force photometry plugin instead"
344 )
345 filepath_fits_detection = None
346 else:
347 filepath_fits_detection = None
349 return filepath_fits_photometry, filepath_fits_detection