Coverage for src/ufig/sysmaps_util.py: 93%

129 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-07 15:17 +0000

1# Copyright (C) 2016 ETH Zurich, Institute for Astronomy 

2 

3""" 

4Created on Nov 16, 2016 

5author: Tomasz Kacprzak 

6""" 

7 

8import os 

9 

10import numba as nb 

11import numpy as np 

12from astropy.io import fits 

13from cosmic_toolbox import logger 

14from scipy.special import gamma 

15 

16from ufig import io_util 

17 

18# from memory_profiler import profile 

19 

20LOGGER = logger.get_logger(__file__) 

21 

22 

23@nb.jit(nopython=True) 

24def chi_mean_loop(detect_image, n_contribute, arange_mu): 

25 for i in range(detect_image.shape[0]): 

26 for j in range(detect_image.shape[1]): 

27 if n_contribute[i, j] == 0: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true

28 detect_image[i, j] = 0.0 

29 else: 

30 detect_image[i, j] = ( 

31 np.sqrt(detect_image[i, j]) - arange_mu[n_contribute[i, j]] 

32 ) / np.sqrt(n_contribute[i, j] - arange_mu[n_contribute[i, j]] ** 2) 

33 

34 

35@nb.jit(nopython=True) 

36def stack_detect_image(detect_image, n_contribute, image, weights): 

37 for i in range(image.shape[0]): 

38 for j in range(image.shape[1]): 

39 if (image[i, j] != 0.0) & (weights[i, j] != 0.0): 39 ↛ 38line 39 didn't jump to line 38 because the condition on line 39 was always true

40 detect_image[i, j] += image[i, j] ** 2 * weights[i, j] 

41 n_contribute[i, j] += 1.0 

42 

43 

44def get_hdf_location_exptime(par, band=None): 

45 """ 

46 Get the filename and dataset for the exposure time map 

47 

48 :param par: ctx().parameters structure 

49 :param band: which band to use (if None, use the single-band parameter values) 

50 :return: filepath, dataset: path and dataset index in hdf file 

51 """ 

52 

53 if par.sysmaps_type == "sysmaps_hdf": 

54 if band is None: 

55 filepath_h5 = par.exp_time_file_name 

56 else: 

57 filepath_h5 = par.exp_time_file_name_dict[band] 

58 

59 dataset = "data" 

60 

61 elif par.sysmaps_type == "sysmaps_hdf_combined": 

62 if band is None: 

63 filepath_h5 = par.filepath_sysmaps 

64 else: 

65 filepath_h5 = par.filepath_sysmaps_dict[band] 

66 

67 dataset = "map_expt" 

68 

69 else: 

70 raise ValueError( 

71 f"unknown sysmaps_type {par.sysmaps_type}" 

72 " (see common.py for available types)" 

73 ) 

74 

75 return filepath_h5, dataset 

76 

77 

78def get_hdf_location_bgrms(par, band=None): 

79 """ 

80 Get the filename and dataset for the background noise map 

81 

82 :param par: ctx().parameters structure 

83 :param band: which band to use (if None, use the single-band parameter values) 

84 :return: filepath, dataset: path and dataset index in hdf file 

85 """ 

86 

87 if par.sysmaps_type == "sysmaps_hdf": 

88 if band is None: 

89 filepath_h5 = par.bkg_rms_file_name 

90 else: 

91 filepath_h5 = par.bkg_rms_file_name_dict[band] 

92 

93 dataset = "data" 

94 

95 elif par.sysmaps_type == "sysmaps_hdf_combined": 

96 if band is None: 

97 filepath_h5 = par.filepath_sysmaps 

98 else: 

99 filepath_h5 = par.filepath_sysmaps_dict[band] 

100 

101 dataset = "map_bsig" 

102 

103 else: 

104 raise ValueError( 

105 f"unknown sysmaps_type {par.sysmaps_type}" 

106 " (see common.py for available types)" 

107 ) 

108 

109 return filepath_h5, dataset 

110 

111 

112def get_hdf_location_invvar(par, band=None): 

113 """ 

114 Get the filename and dataset for the inverse variance map 

115 

116 :param par: ctx().parameters structure 

117 :param band: which band to use (if None, use the single-band parameter values) 

118 :return: filepath, dataset: path and dataset index in hdf file 

119 """ 

120 

121 if par.sysmaps_type == "sysmaps_hdf": 

122 filepath_h5 = par.weight_image if band is None else par.weight_image_dict[band] 

123 dataset = "data" 

124 

125 elif par.sysmaps_type == "sysmaps_hdf_combined": 

126 if band is None: 

127 filepath_h5 = par.filepath_sysmaps 

128 else: 

129 filepath_h5 = par.filepath_sysmaps_dict[band] 

130 

131 dataset = "map_invv" 

132 

133 else: 

134 raise ValueError( 

135 f"unknown sysmaps_type {par.sysmaps_type}" 

136 " (see common.py for available types)" 

137 ) 

138 

139 return filepath_h5, dataset 

140 

141 

142def get_hdf_location_gain(par, band=None): 

143 """ 

144 Get the filename and dataset for the background noise map 

145 

146 :param par: ctx().parameters structure 

147 :param band: which band to use (if None, use the single-band parameter values) 

148 :return: filepath, dataset: path and dataset index in hdf file 

149 """ 

150 

151 if par.sysmaps_type == "sysmaps_hdf": 

152 if band is None: 

153 filepath_h5 = par.gain_map_file_name 

154 else: 

155 filepath_h5 = par.gain_map_file_name_dict[band] 

156 

157 dataset = "data" 

158 

159 elif par.sysmaps_type == "sysmaps_hdf_combined": 

160 if band is None: 

161 filepath_h5 = par.filepath_sysmaps 

162 else: 

163 filepath_h5 = par.filepath_sysmaps_dict[band] 

164 

165 dataset = "map_gain" 

166 

167 else: 

168 raise ValueError( 

169 f"unknown sysmaps_type {par.sysmaps_type}" 

170 " (see common.py for available types)" 

171 ) 

172 

173 return filepath_h5, dataset 

174 

175 

176def chi_mean_combination_lowmem(par): 

177 """ 

178 This creates a CHI-MEAN detection image, as is done by the DES pipeline, see 

179 https://iopscience.iop.org/article/10.3847/1538-4365/aab4f5/pdf 

180 (DOI: 10.3847/1538-4365/aab4f5), appendix B. 

181 

182 Copy of chi_mean_combination made to run with low memory 

183 

184 :param images_and_weights: iterable yielding images with weights to be combined 

185 :return: chi-mean detection image 

186 """ 

187 

188 for i, band in enumerate(par.sextractor_forced_photo_detection_bands): 

189 # load image 

190 image = io_util.load_image_chunks( 

191 file_name=par.image_name_dict[band], ext=0, dtype=np.float32 

192 ) 

193 

194 # load weight map 

195 filepath, dataset = get_hdf_location_invvar(par, band=band) 

196 weights = io_util.load_from_hdf5( 

197 file_name=filepath, hdf5_keys=dataset, root_path=par.maps_remote_dir 

198 ) 

199 

200 if i == 0: 

201 detect_image = np.zeros_like(image, dtype=np.float32) 

202 n_contribute = np.zeros_like(image, dtype=np.uint8) 

203 

204 stack_detect_image(detect_image, n_contribute, image, weights) 

205 del image 

206 del weights 

207 

208 n_contribute_unique = np.arange(50, dtype=np.float32) 

209 arange_mu = ( 

210 np.sqrt(2) 

211 * gamma((n_contribute_unique + 1) / 2) 

212 / gamma(n_contribute_unique / 2) 

213 ) 

214 

215 chi_mean_loop(detect_image, n_contribute, arange_mu) 

216 

217 return detect_image 

218 

219 

220def get_path_temp_sextractor_weight(par, band): 

221 """ 

222 Constructs the path to the temporary fits-file with the weights for SExtractor 

223 

224 :param par: ctx.parameters 

225 :param band: filter band 

226 :return: path 

227 """ 

228 dirpath_temp = io_util.get_abs_path( 

229 par.tempdir_weight_fits, root_path=par.tempdir_weight_fits, is_file=False 

230 ) 

231 path = os.path.join(dirpath_temp, par.image_name_dict[band] + "__temp_invvar.fits") 

232 return path 

233 

234 

235def write_temp_sextractor_weight(par, path_out, band=None, dtype=np.float32): 

236 """ 

237 Reads out the weight map (inverse variance) from hdf5 and writes it to fits, s.t. it 

238 can be used by SExtractor 

239 

240 :param par: ctx.parameters 

241 :param path_out: path where weight map will be stored 

242 :param band: filter band 

243 """ 

244 filepath, dataset = get_hdf_location_invvar(par, band=band) 

245 map_invv_photometry = io_util.load_from_hdf5( 

246 file_name=filepath, hdf5_keys=dataset, root_path=par.maps_remote_dir 

247 ).astype(dtype) 

248 fits.writeto(path_out, map_invv_photometry, overwrite=True) 

249 

250 

251# @profile 

252def get_detection_image(par): 

253 """ 

254 Constructs the detection image for SExtractor. In case of a single-band detection 

255 image, simply writes the weight map of the detection band to a fits file. For 

256 multi-band detection, the function computes the CHI-MEAN combination of the 

257 detection band images and their weights. 

258 :param par: ctx.parameters 

259 :return: path to detection image (fits), path to detection weights (fits or 'NONE'), 

260 weight type of detection image for SExtractor, either according to input 

261 parameters (single-band) or 'NONE' (multi-band detection image) 

262 """ 

263 

264 if len(par.sextractor_forced_photo_detection_bands) == 1: 

265 path_detection_image = par.image_name_dict[ 

266 par.sextractor_forced_photo_detection_bands[0] 

267 ] 

268 if par.weight_type == "NONE": 268 ↛ 273line 268 didn't jump to line 273 because the condition on line 268 was always true

269 path_detection_weight = "NONE" 

270 detection_weight_type = "NONE" 

271 

272 else: 

273 path_detection_weight = get_path_temp_sextractor_weight( 

274 par, par.sextractor_forced_photo_detection_bands[0] 

275 ) 

276 detection_weight_type = par.weight_type 

277 if not os.path.isfile(path_detection_weight): 

278 write_temp_sextractor_weight( 

279 par, 

280 path_detection_weight, 

281 band=par.sextractor_forced_photo_detection_bands[0], 

282 ) 

283 

284 else: 

285 dirpath_temp = io_util.get_abs_path( 

286 par.tempdir_weight_fits, root_path=par.tempdir_weight_fits, is_file=False 

287 ) 

288 filename_detection_image = "temp_detection_image_{}.fits".format( 

289 "".join(par.sextractor_forced_photo_detection_bands) 

290 ) 

291 path_detection_image = os.path.join(dirpath_temp, filename_detection_image) 

292 path_detection_weight = "NONE" 

293 detection_weight_type = "NONE" 

294 

295 if not os.path.isfile(path_detection_image): 295 ↛ 301line 295 didn't jump to line 301 because the condition on line 295 was always true

296 detection_image = chi_mean_combination_lowmem(par) 

297 header = fits.getheader(par.image_name_dict[par.filters[0]]) 

298 fits.writeto( 

299 path_detection_image, detection_image, header=header, overwrite=True 

300 ) 

301 return path_detection_image, path_detection_weight, detection_weight_type 

302 

303 

304def write_temp_sextractor_weights( 

305 par, dirpath_temp, overwrite_photo=True, overwrite_det=True, dtype=np.float32 

306): 

307 """ 

308 Write fits weight maps for SExtractor in the temp folder. 

309 

310 :param par: ctx().parameters structure 

311 :param dirpath_temp: temp dir where to write files 

312 :param overwrite_photo: if to overwrite photometry weight if exists 

313 :param overwrite_det: if to overwrite detection weight if exists 

314 

315 :return: filepath_fits_photometry, filepath_fits_detection: paths to decompressed 

316 weights 

317 """ 

318 

319 # ========== 

320 # Photometry 

321 # ========== 

322 

323 dirpath_temp = io_util.get_abs_path( 

324 dirpath_temp, root_path=dirpath_temp, is_file=False 

325 ) 

326 filepath_fits_photometry = os.path.join( 

327 dirpath_temp, par.image_name + "__temp_invvar.fits" 

328 ) 

329 filepath, dataset = get_hdf_location_invvar(par) 

330 

331 if filepath.endswith(".fits"): 

332 filepath_fits_photometry = filepath 

333 

334 elif (not os.path.isfile(filepath_fits_photometry)) or overwrite_photo: 334 ↛ 347line 334 didn't jump to line 347 because the condition on line 334 was always true

335 map_invv_photometry = io_util.load_from_hdf5( 

336 file_name=filepath, hdf5_keys=dataset, root_path=par.maps_remote_dir 

337 ) 

338 

339 fits.writeto( 

340 filepath_fits_photometry, map_invv_photometry.astype(dtype), overwrite=True 

341 ) 

342 

343 # ========= 

344 # Detection 

345 # ========= 

346 

347 if par.sextractor_use_forced_photo: 347 ↛ 353line 347 didn't jump to line 353 because the condition on line 347 was always true

348 LOGGER.warning( 

349 "This line should not be executed, use the force photometry plugin instead" 

350 ) 

351 filepath_fits_detection = None 

352 else: 

353 filepath_fits_detection = None 

354 

355 return filepath_fits_photometry, filepath_fits_detection