Coverage for src/ufig/plugins/run_sextractor_forced_photometry.py: 89%

109 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-17 08:48 +0000

1# Copyright (c) 2016 ETH Zurich, Institute of Astronomy, Cosmology Research Group 

2 

3""" 

4Created on Jul 12, 2016 

5@author: Joerg Herbel 

6""" 

7 

8import os 

9import subprocess 

10import tempfile 

11 

12import h5py 

13import numpy as np 

14from astropy.io import fits 

15from cosmic_toolbox import arraytools as at 

16from cosmic_toolbox import logger 

17from ivy.plugin.base_plugin import BasePlugin 

18from pkg_resources import resource_filename 

19 

20import ufig 

21from ufig import array_util, sysmaps_util 

22from ufig.plugins.write_image import get_seeing_value 

23 

24LOGGER = logger.get_logger(__file__) 

25 

26NAME = "run SourceExtractor (forced-photo)" 

27HDF5_COMPRESS = {"compression": "gzip", "compression_opts": 9, "shuffle": True} 

28 

29 

30def convert_fits_to_hdf(filepath_fits, fits_ext=2): 

31 try: 

32 cat = np.array(fits.getdata(filepath_fits, ext=fits_ext)) 

33 cat = cat.byteswap() 

34 cat = cat.view(cat.dtype.newbyteorder()) 

35 except Exception as errmsg: # pragma: no cover 

36 LOGGER.error(f"failed to fits-open {filepath_fits}, already hdf?") 

37 LOGGER.error(errmsg) 

38 

39 os.remove(filepath_fits) 

40 

41 cat = array_util.rec_float64_to_float32(cat) 

42 

43 at.save_hdf_cols(filepath_fits, cat, compression=HDF5_COMPRESS) 

44 

45 LOGGER.info(f"converted to hdf: {filepath_fits}") 

46 

47 

48def checkimages_to_hdf( 

49 checkimage_type, 

50 checkimages_names_fits, 

51 checkimages_names_hdf5, 

52 kw_dataset=None, 

53): 

54 if kw_dataset is None: 54 ↛ 60line 54 didn't jump to line 60 because the condition on line 54 was always true

55 kw_dataset = { 

56 "compression": "gzip", 

57 "compression_opts": 9, 

58 "shuffle": True, 

59 } 

60 if len(checkimages_names_hdf5) > 0: 

61 for c, f_fits, f_hdf5 in zip( 

62 checkimage_type, checkimages_names_fits, checkimages_names_hdf5 

63 ): 

64 img = np.array(fits.getdata(f_fits)) 

65 with h5py.File(f_hdf5, "w") as f: 

66 f.create_dataset(name=c, data=img, **kw_dataset) 

67 os.remove(f_fits) 

68 LOGGER.info( 

69 f"converted checkimage={c} dtype={str(img.dtype)} {f_fits} -> {f_hdf5}" 

70 ) 

71 

72 

73def enforce_abs_path(path): 

74 """ 

75 Build an absolute path using the path of the SExtractor directory in UFig. In case 

76 the input is already a path (and not only a filename), it is left unchanged. 

77 

78 :param path: Input path 

79 :return: Absolute path 

80 """ 

81 

82 if path == os.path.basename(path): 82 ↛ 85line 82 didn't jump to line 85 because the condition on line 82 was always true

83 path = resource_filename(ufig.__name__, "res/sextractor/" + path) 

84 

85 return path 

86 

87 

88def kwarg_to_sextractor_arg(key, value): 

89 """ 

90 Construct a SExtractor command line argument from a keyword argument. The key of the 

91 keyword argument must be a valid SExtractor parameter (modulo upper cases). The 

92 value must convertible to a string. It can also be an iterable containing only 

93 objects that can be converted to a string. 

94 

95 :param key: Key 

96 :param value: Value 

97 

98 :return: SExtractor command line argument as a list in the form 

99 [-<PARAM_NAME>, VALUE] 

100 """ 

101 

102 path_keys = ("starnnw_name", "parameters_name", "filter_name") 

103 

104 if str(value) != value: 

105 try: 

106 value = map(str, value) 

107 except TypeError: 

108 value = [str(value)] 

109 value = ",".join(value) 

110 

111 if key in path_keys: 

112 value = enforce_abs_path(value) 

113 

114 sextractor_arg = ["-" + key.upper(), value] 

115 

116 return sextractor_arg 

117 

118 

119def build_sextractor_cmd(binary_name, image_name, config_name, **kwargs): 

120 """ 

121 Construct a list of strings which make up a valid command line argument to call 

122 SExtractor. 

123 

124 :param binary_name: Path of SExtractor executable 

125 :param image_name: Path(s) of image(s) on which SExtractor will run 

126 :param config_name: Path of SExtractor configuration file 

127 :param kwargs: Keyword arguments that can be converted to SExtractor command line 

128 arguments 

129 

130 :return: Command to call SExtractor as a list of strings 

131 """ 

132 

133 config_name = enforce_abs_path(config_name) 

134 

135 if str(image_name) != image_name: 135 ↛ 138line 135 didn't jump to line 138 because the condition on line 135 was always true

136 image_name = ",".join(image_name) 

137 

138 cmd = [binary_name, image_name, "-c", config_name] 

139 

140 for key in kwargs: 

141 cmd += kwarg_to_sextractor_arg(key, kwargs[key]) 

142 

143 return cmd 

144 

145 

146def get_checkimages( 

147 sextractor_catalog_name, sextractor_checkimages, sextractor_checkimages_suffixes 

148): 

149 catalog_name_short = sextractor_catalog_name.rsplit(".", 1)[0] 

150 checkimages_names = [] 

151 checkimages_names_hdf5 = [] 

152 checkimages = sextractor_checkimages 

153 

154 for suffix in sextractor_checkimages_suffixes: 

155 checkimage_name = catalog_name_short + suffix 

156 checkimage_name_hdf5 = checkimage_name.replace(".fits", ".h5") 

157 checkimages_names += [checkimage_name] 

158 checkimages_names_hdf5 += [checkimage_name_hdf5] 

159 

160 if len(checkimages_names) == 0: 

161 checkimages_names = ["NONE"] 

162 checkimages = ["NONE"] 

163 

164 return checkimages, checkimages_names, checkimages_names_hdf5 

165 

166 

167def run_sextractor(binary_name, image_name, config_name, **kwargs): 

168 """ 

169 Run SExtractor by spawning a subprocess. 

170 

171 :param binary_name: Path of SExtractor executable 

172 :param image_name: Path(s) of image(s) on which SExtractor will run 

173 :param config_name: Path of SExtractor configuration file 

174 :param kwargs: Keyword arguments that can be converted to SExtractor command line 

175 arguments 

176 """ 

177 cmd = build_sextractor_cmd(binary_name, image_name, config_name, **kwargs) 

178 LOGGER.info(" ".join(cmd)) 

179 subprocess.check_call(cmd, stderr=subprocess.STDOUT) 

180 

181 

182class Plugin(BasePlugin): 

183 """ 

184 Run SExtractor in forced-photometry mode. 

185 """ 

186 

187 def __call__(self): 

188 par = self.ctx.parameters 

189 

190 sextractor_catalog_name = par.sextractor_forced_photo_catalog_name_dict[ 

191 self.ctx.current_filter 

192 ] 

193 

194 if self.ctx.parameters.sextractor_use_temp: 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true

195 sextractor_catalog_name = os.path.join( 

196 tempfile.mkdtemp("sextractor"), 

197 os.path.split(sextractor_catalog_name)[-1], 

198 ) 

199 ( 

200 path_detection_image, 

201 path_detection_weight, 

202 detection_weight_type, 

203 ) = sysmaps_util.get_detection_image(par) 

204 remove_detection_image = (self.ctx.current_filter == par.filters[-1]) & ( 

205 len(par.sextractor_forced_photo_detection_bands) > 1 

206 ) 

207 if par.weight_type != "NONE": 207 ↛ 209line 207 didn't jump to line 209 because the condition on line 207 was never true

208 # write weights for photometry band 

209 path_photometry_weight = sysmaps_util.get_path_temp_sextractor_weight( 

210 par, self.ctx.current_filter 

211 ) 

212 sysmaps_util.write_temp_sextractor_weight(par, path_photometry_weight) 

213 

214 # check if written weights can be removed after running SourceExtractor 

215 remove_photo_weights = path_detection_weight != path_photometry_weight 

216 remove_detection_weights = (self.ctx.current_filter == par.filters[-1]) & ( 

217 path_detection_weight != "NONE" 

218 ) 

219 

220 else: 

221 detection_weight_type = par.weight_type 

222 path_detection_weight = "NONE" 

223 path_photometry_weight = "NONE" 

224 remove_photo_weights = False 

225 remove_detection_weights = False 

226 

227 checkimages, checkimages_names, checkimages_names_hdf5 = get_checkimages( 

228 sextractor_catalog_name, 

229 par.sextractor_checkimages, 

230 par.sextractor_checkimages_suffixes, 

231 ) 

232 

233 if par.flag_gain_times_nexp: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 run_sextractor( 

235 par.sextractor_binary, 

236 (path_detection_image, par.image_name), 

237 par.sextractor_config, 

238 seeing_fwhm=get_seeing_value(self.ctx) * par.pixscale, 

239 satur_key="NONE", 

240 satur_level=par.saturation_level, 

241 mag_zeropoint=par.magzero, 

242 gain_key="NONE", 

243 gain=par.n_exp * par.gain, 

244 pixel_scale=par.pixscale, 

245 starnnw_name=par.sextractor_nnw, 

246 filter_name=par.sextractor_filter, 

247 parameters_name=par.sextractor_params, 

248 catalog_name=sextractor_catalog_name, 

249 checkimage_type=",".join(checkimages), 

250 checkimage_name=",".join(checkimages_names), 

251 weight_type=f"{detection_weight_type},{par.weight_type}", 

252 weight_gain=",".join([par.weight_gain] * 2), 

253 weight_image=(path_detection_weight, path_photometry_weight), 

254 catalog_type="FITS_LDAC", 

255 verbose_type="QUIET", 

256 ) 

257 

258 else: 

259 run_sextractor( 

260 par.sextractor_binary, 

261 (path_detection_image, par.image_name), 

262 par.sextractor_config, 

263 seeing_fwhm=get_seeing_value(self.ctx) * par.pixscale, 

264 satur_key="NONE", 

265 satur_level=par.saturation_level, 

266 mag_zeropoint=par.magzero, 

267 gain_key="NONE", 

268 gain=par.gain, 

269 pixel_scale=par.pixscale, 

270 starnnw_name=par.sextractor_nnw, 

271 filter_name=par.sextractor_filter, 

272 parameters_name=par.sextractor_params, 

273 catalog_name=sextractor_catalog_name, 

274 checkimage_type=",".join(checkimages), 

275 checkimage_name=",".join(checkimages_names), 

276 weight_type=f"{detection_weight_type},{par.weight_type}", 

277 weight_gain=",".join([par.weight_gain] * 2), 

278 catalog_type="FITS_LDAC", 

279 verbose_type="QUIET", 

280 ) 

281 

282 convert_fits_to_hdf(filepath_fits=sextractor_catalog_name) 

283 

284 checkimages_to_hdf( 

285 checkimage_type=checkimages, 

286 checkimages_names_fits=checkimages_names, 

287 checkimages_names_hdf5=checkimages_names_hdf5, 

288 ) 

289 

290 # Check whether detection and photo weight image should be removed 

291 if remove_photo_weights: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true

292 os.remove(path_photometry_weight) 

293 

294 if remove_detection_weights: 294 ↛ 295line 294 didn't jump to line 295 because the condition on line 294 was never true

295 os.remove(path_detection_weight) 

296 

297 # Check if detection image should be removed 

298 if remove_detection_image: 

299 os.remove(path_detection_image) 

300 

301 def __str__(self): 

302 return NAME