Coverage for src/ufig/plugins/run_sextractor_forced_photometry.py: 88%

107 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-12 19:08 +0000

1# Copyright (c) 2016 ETH Zurich, Institute of Astronomy, Cosmology Research Group 

2 

3""" 

4Created on Jul 12, 2016 

5@author: Joerg Herbel 

6""" 

7 

8import os 

9import subprocess 

10import tempfile 

11 

12import h5py 

13import numpy as np 

14from astropy.io import fits 

15from cosmic_toolbox import arraytools as at 

16from cosmic_toolbox import logger 

17from ivy.plugin.base_plugin import BasePlugin 

18from pkg_resources import resource_filename 

19 

20import ufig 

21from ufig import array_util, sysmaps_util 

22from ufig.plugins.write_image import get_seeing_value 

23 

24LOGGER = logger.get_logger(__file__) 

25 

26NAME = "run SourceExtractor (forced-photo)" 

27HDF5_COMPRESS = {"compression": "gzip", "compression_opts": 9, "shuffle": True} 

28 

29 

30def convert_fits_to_hdf(filepath_fits, fits_ext=2): 

31 try: 

32 cat = ( 

33 np.array(fits.getdata(filepath_fits, ext=fits_ext)) 

34 .byteswap() 

35 .newbyteorder() 

36 ) 

37 except Exception as errmsg: # pragma: no cover 

38 LOGGER.error(f"failed to fits-open {filepath_fits}, already hdf?") 

39 LOGGER.error(errmsg) 

40 

41 os.remove(filepath_fits) 

42 

43 cat = array_util.rec_float64_to_float32(cat) 

44 

45 at.save_hdf_cols(filepath_fits, cat, compression=HDF5_COMPRESS) 

46 

47 LOGGER.info(f"converted to hdf: {filepath_fits}") 

48 

49 

50def checkimages_to_hdf( 

51 checkimage_type, 

52 checkimages_names_fits, 

53 checkimages_names_hdf5, 

54 kw_dataset=None, 

55): 

56 if kw_dataset is None: 56 ↛ 62line 56 didn't jump to line 62 because the condition on line 56 was always true

57 kw_dataset = { 

58 "compression": "gzip", 

59 "compression_opts": 9, 

60 "shuffle": True, 

61 } 

62 if len(checkimages_names_hdf5) > 0: 

63 for c, f_fits, f_hdf5 in zip( 

64 checkimage_type, checkimages_names_fits, checkimages_names_hdf5 

65 ): 

66 img = np.array(fits.getdata(f_fits)) 

67 with h5py.File(f_hdf5, "w") as f: 

68 f.create_dataset(name=c, data=img, **kw_dataset) 

69 os.remove(f_fits) 

70 LOGGER.info( 

71 f"converted checkimage={c} dtype={str(img.dtype)} {f_fits} -> {f_hdf5}" 

72 ) 

73 

74 

75def enforce_abs_path(path): 

76 """ 

77 Build an absolute path using the path of the SExtractor directory in UFig. In case 

78 the input is already a path (and not only a filename), it is left unchanged. 

79 

80 :param path: Input path 

81 :return: Absolute path 

82 """ 

83 

84 if path == os.path.basename(path): 84 ↛ 87line 84 didn't jump to line 87 because the condition on line 84 was always true

85 path = resource_filename(ufig.__name__, "res/sextractor/" + path) 

86 

87 return path 

88 

89 

90def kwarg_to_sextractor_arg(key, value): 

91 """ 

92 Construct a SExtractor command line argument from a keyword argument. The key of the 

93 keyword argument must be a valid SExtractor parameter (modulo upper cases). The 

94 value must convertible to a string. It can also be an iterable containing only 

95 objects that can be converted to a string. 

96 

97 :param key: Key 

98 :param value: Value 

99 

100 :return: SExtractor command line argument as a list in the form 

101 [-<PARAM_NAME>, VALUE] 

102 """ 

103 

104 path_keys = ("starnnw_name", "parameters_name", "filter_name") 

105 

106 if str(value) != value: 

107 try: 

108 value = map(str, value) 

109 except TypeError: 

110 value = [str(value)] 

111 value = ",".join(value) 

112 

113 if key in path_keys: 

114 value = enforce_abs_path(value) 

115 

116 sextractor_arg = ["-" + key.upper(), value] 

117 

118 return sextractor_arg 

119 

120 

121def build_sextractor_cmd(binary_name, image_name, config_name, **kwargs): 

122 """ 

123 Construct a list of strings which make up a valid command line argument to call 

124 SExtractor. 

125 

126 :param binary_name: Path of SExtractor executable 

127 :param image_name: Path(s) of image(s) on which SExtractor will run 

128 :param config_name: Path of SExtractor configuration file 

129 :param kwargs: Keyword arguments that can be converted to SExtractor command line 

130 arguments 

131 

132 :return: Command to call SExtractor as a list of strings 

133 """ 

134 

135 config_name = enforce_abs_path(config_name) 

136 

137 if str(image_name) != image_name: 137 ↛ 140line 137 didn't jump to line 140 because the condition on line 137 was always true

138 image_name = ",".join(image_name) 

139 

140 cmd = [binary_name, image_name, "-c", config_name] 

141 

142 for key in kwargs: 

143 cmd += kwarg_to_sextractor_arg(key, kwargs[key]) 

144 

145 return cmd 

146 

147 

148def get_checkimages( 

149 sextractor_catalog_name, sextractor_checkimages, sextractor_checkimages_suffixes 

150): 

151 catalog_name_short = sextractor_catalog_name.rsplit(".", 1)[0] 

152 checkimages_names = [] 

153 checkimages_names_hdf5 = [] 

154 checkimages = sextractor_checkimages 

155 

156 for suffix in sextractor_checkimages_suffixes: 

157 checkimage_name = catalog_name_short + suffix 

158 checkimage_name_hdf5 = checkimage_name.replace(".fits", ".h5") 

159 checkimages_names += [checkimage_name] 

160 checkimages_names_hdf5 += [checkimage_name_hdf5] 

161 

162 if len(checkimages_names) == 0: 

163 checkimages_names = ["NONE"] 

164 checkimages = ["NONE"] 

165 

166 return checkimages, checkimages_names, checkimages_names_hdf5 

167 

168 

169def run_sextractor(binary_name, image_name, config_name, **kwargs): 

170 """ 

171 Run SExtractor by spawning a subprocess. 

172 

173 :param binary_name: Path of SExtractor executable 

174 :param image_name: Path(s) of image(s) on which SExtractor will run 

175 :param config_name: Path of SExtractor configuration file 

176 :param kwargs: Keyword arguments that can be converted to SExtractor command line 

177 arguments 

178 """ 

179 cmd = build_sextractor_cmd(binary_name, image_name, config_name, **kwargs) 

180 LOGGER.info(" ".join(cmd)) 

181 subprocess.check_call(cmd, stderr=subprocess.STDOUT) 

182 

183 

184class Plugin(BasePlugin): 

185 """ 

186 Run SExtractor in forced-photometry mode. 

187 """ 

188 

189 def __call__(self): 

190 par = self.ctx.parameters 

191 

192 sextractor_catalog_name = par.sextractor_forced_photo_catalog_name_dict[ 

193 self.ctx.current_filter 

194 ] 

195 

196 if self.ctx.parameters.sextractor_use_temp: 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true

197 sextractor_catalog_name = os.path.join( 

198 tempfile.mkdtemp("sextractor"), 

199 os.path.split(sextractor_catalog_name)[-1], 

200 ) 

201 ( 

202 path_detection_image, 

203 path_detection_weight, 

204 detection_weight_type, 

205 ) = sysmaps_util.get_detection_image(par) 

206 remove_detection_image = (self.ctx.current_filter == par.filters[-1]) & ( 

207 len(par.sextractor_forced_photo_detection_bands) > 1 

208 ) 

209 if par.weight_type != "NONE": 209 ↛ 211line 209 didn't jump to line 211 because the condition on line 209 was never true

210 # write weights for photometry band 

211 path_photometry_weight = sysmaps_util.get_path_temp_sextractor_weight( 

212 par, self.ctx.current_filter 

213 ) 

214 sysmaps_util.write_temp_sextractor_weight(par, path_photometry_weight) 

215 

216 # check if written weights can be removed after running SourceExtractor 

217 remove_photo_weights = path_detection_weight != path_photometry_weight 

218 remove_detection_weights = (self.ctx.current_filter == par.filters[-1]) & ( 

219 path_detection_weight != "NONE" 

220 ) 

221 

222 else: 

223 detection_weight_type = par.weight_type 

224 path_detection_weight = "NONE" 

225 path_photometry_weight = "NONE" 

226 remove_photo_weights = False 

227 remove_detection_weights = False 

228 

229 checkimages, checkimages_names, checkimages_names_hdf5 = get_checkimages( 

230 sextractor_catalog_name, 

231 par.sextractor_checkimages, 

232 par.sextractor_checkimages_suffixes, 

233 ) 

234 

235 if par.flag_gain_times_nexp: 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 run_sextractor( 

237 par.sextractor_binary, 

238 (path_detection_image, par.image_name), 

239 par.sextractor_config, 

240 seeing_fwhm=get_seeing_value(self.ctx) * par.pixscale, 

241 satur_key="NONE", 

242 satur_level=par.saturation_level, 

243 mag_zeropoint=par.magzero, 

244 gain_key="NONE", 

245 gain=par.n_exp * par.gain, 

246 pixel_scale=par.pixscale, 

247 starnnw_name=par.sextractor_nnw, 

248 filter_name=par.sextractor_filter, 

249 parameters_name=par.sextractor_params, 

250 catalog_name=sextractor_catalog_name, 

251 checkimage_type=",".join(checkimages), 

252 checkimage_name=",".join(checkimages_names), 

253 weight_type=f"{detection_weight_type},{par.weight_type}", 

254 weight_gain=",".join([par.weight_gain] * 2), 

255 weight_image=(path_detection_weight, path_photometry_weight), 

256 catalog_type="FITS_LDAC", 

257 verbose_type="QUIET", 

258 ) 

259 

260 else: 

261 run_sextractor( 

262 par.sextractor_binary, 

263 (path_detection_image, par.image_name), 

264 par.sextractor_config, 

265 seeing_fwhm=get_seeing_value(self.ctx) * par.pixscale, 

266 satur_key="NONE", 

267 satur_level=par.saturation_level, 

268 mag_zeropoint=par.magzero, 

269 gain_key="NONE", 

270 gain=par.gain, 

271 pixel_scale=par.pixscale, 

272 starnnw_name=par.sextractor_nnw, 

273 filter_name=par.sextractor_filter, 

274 parameters_name=par.sextractor_params, 

275 catalog_name=sextractor_catalog_name, 

276 checkimage_type=",".join(checkimages), 

277 checkimage_name=",".join(checkimages_names), 

278 weight_type=f"{detection_weight_type},{par.weight_type}", 

279 weight_gain=",".join([par.weight_gain] * 2), 

280 catalog_type="FITS_LDAC", 

281 verbose_type="QUIET", 

282 ) 

283 

284 convert_fits_to_hdf(filepath_fits=sextractor_catalog_name) 

285 

286 checkimages_to_hdf( 

287 checkimage_type=checkimages, 

288 checkimages_names_fits=checkimages_names, 

289 checkimages_names_hdf5=checkimages_names_hdf5, 

290 ) 

291 

292 # Check whether detection and photo weight image should be removed 

293 if remove_photo_weights: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true

294 os.remove(path_photometry_weight) 

295 

296 if remove_detection_weights: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true

297 os.remove(path_detection_weight) 

298 

299 # Check if detection image should be removed 

300 if remove_detection_image: 

301 os.remove(path_detection_image) 

302 

303 def __str__(self): 

304 return NAME