Coverage for src/ufig/plugins/run_sextractor_forced_photometry.py: 88%
107 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 19:08 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 19:08 +0000
1# Copyright (c) 2016 ETH Zurich, Institute of Astronomy, Cosmology Research Group
3"""
4Created on Jul 12, 2016
5@author: Joerg Herbel
6"""
8import os
9import subprocess
10import tempfile
12import h5py
13import numpy as np
14from astropy.io import fits
15from cosmic_toolbox import arraytools as at
16from cosmic_toolbox import logger
17from ivy.plugin.base_plugin import BasePlugin
18from pkg_resources import resource_filename
20import ufig
21from ufig import array_util, sysmaps_util
22from ufig.plugins.write_image import get_seeing_value
24LOGGER = logger.get_logger(__file__)
26NAME = "run SourceExtractor (forced-photo)"
27HDF5_COMPRESS = {"compression": "gzip", "compression_opts": 9, "shuffle": True}
30def convert_fits_to_hdf(filepath_fits, fits_ext=2):
31 try:
32 cat = (
33 np.array(fits.getdata(filepath_fits, ext=fits_ext))
34 .byteswap()
35 .newbyteorder()
36 )
37 except Exception as errmsg: # pragma: no cover
38 LOGGER.error(f"failed to fits-open {filepath_fits}, already hdf?")
39 LOGGER.error(errmsg)
41 os.remove(filepath_fits)
43 cat = array_util.rec_float64_to_float32(cat)
45 at.save_hdf_cols(filepath_fits, cat, compression=HDF5_COMPRESS)
47 LOGGER.info(f"converted to hdf: {filepath_fits}")
50def checkimages_to_hdf(
51 checkimage_type,
52 checkimages_names_fits,
53 checkimages_names_hdf5,
54 kw_dataset=None,
55):
56 if kw_dataset is None: 56 ↛ 62line 56 didn't jump to line 62 because the condition on line 56 was always true
57 kw_dataset = {
58 "compression": "gzip",
59 "compression_opts": 9,
60 "shuffle": True,
61 }
62 if len(checkimages_names_hdf5) > 0:
63 for c, f_fits, f_hdf5 in zip(
64 checkimage_type, checkimages_names_fits, checkimages_names_hdf5
65 ):
66 img = np.array(fits.getdata(f_fits))
67 with h5py.File(f_hdf5, "w") as f:
68 f.create_dataset(name=c, data=img, **kw_dataset)
69 os.remove(f_fits)
70 LOGGER.info(
71 f"converted checkimage={c} dtype={str(img.dtype)} {f_fits} -> {f_hdf5}"
72 )
75def enforce_abs_path(path):
76 """
77 Build an absolute path using the path of the SExtractor directory in UFig. In case
78 the input is already a path (and not only a filename), it is left unchanged.
80 :param path: Input path
81 :return: Absolute path
82 """
84 if path == os.path.basename(path): 84 ↛ 87line 84 didn't jump to line 87 because the condition on line 84 was always true
85 path = resource_filename(ufig.__name__, "res/sextractor/" + path)
87 return path
90def kwarg_to_sextractor_arg(key, value):
91 """
92 Construct a SExtractor command line argument from a keyword argument. The key of the
93 keyword argument must be a valid SExtractor parameter (modulo upper cases). The
94 value must convertible to a string. It can also be an iterable containing only
95 objects that can be converted to a string.
97 :param key: Key
98 :param value: Value
100 :return: SExtractor command line argument as a list in the form
101 [-<PARAM_NAME>, VALUE]
102 """
104 path_keys = ("starnnw_name", "parameters_name", "filter_name")
106 if str(value) != value:
107 try:
108 value = map(str, value)
109 except TypeError:
110 value = [str(value)]
111 value = ",".join(value)
113 if key in path_keys:
114 value = enforce_abs_path(value)
116 sextractor_arg = ["-" + key.upper(), value]
118 return sextractor_arg
121def build_sextractor_cmd(binary_name, image_name, config_name, **kwargs):
122 """
123 Construct a list of strings which make up a valid command line argument to call
124 SExtractor.
126 :param binary_name: Path of SExtractor executable
127 :param image_name: Path(s) of image(s) on which SExtractor will run
128 :param config_name: Path of SExtractor configuration file
129 :param kwargs: Keyword arguments that can be converted to SExtractor command line
130 arguments
132 :return: Command to call SExtractor as a list of strings
133 """
135 config_name = enforce_abs_path(config_name)
137 if str(image_name) != image_name: 137 ↛ 140line 137 didn't jump to line 140 because the condition on line 137 was always true
138 image_name = ",".join(image_name)
140 cmd = [binary_name, image_name, "-c", config_name]
142 for key in kwargs:
143 cmd += kwarg_to_sextractor_arg(key, kwargs[key])
145 return cmd
148def get_checkimages(
149 sextractor_catalog_name, sextractor_checkimages, sextractor_checkimages_suffixes
150):
151 catalog_name_short = sextractor_catalog_name.rsplit(".", 1)[0]
152 checkimages_names = []
153 checkimages_names_hdf5 = []
154 checkimages = sextractor_checkimages
156 for suffix in sextractor_checkimages_suffixes:
157 checkimage_name = catalog_name_short + suffix
158 checkimage_name_hdf5 = checkimage_name.replace(".fits", ".h5")
159 checkimages_names += [checkimage_name]
160 checkimages_names_hdf5 += [checkimage_name_hdf5]
162 if len(checkimages_names) == 0:
163 checkimages_names = ["NONE"]
164 checkimages = ["NONE"]
166 return checkimages, checkimages_names, checkimages_names_hdf5
169def run_sextractor(binary_name, image_name, config_name, **kwargs):
170 """
171 Run SExtractor by spawning a subprocess.
173 :param binary_name: Path of SExtractor executable
174 :param image_name: Path(s) of image(s) on which SExtractor will run
175 :param config_name: Path of SExtractor configuration file
176 :param kwargs: Keyword arguments that can be converted to SExtractor command line
177 arguments
178 """
179 cmd = build_sextractor_cmd(binary_name, image_name, config_name, **kwargs)
180 LOGGER.info(" ".join(cmd))
181 subprocess.check_call(cmd, stderr=subprocess.STDOUT)
184class Plugin(BasePlugin):
185 """
186 Run SExtractor in forced-photometry mode.
187 """
189 def __call__(self):
190 par = self.ctx.parameters
192 sextractor_catalog_name = par.sextractor_forced_photo_catalog_name_dict[
193 self.ctx.current_filter
194 ]
196 if self.ctx.parameters.sextractor_use_temp: 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true
197 sextractor_catalog_name = os.path.join(
198 tempfile.mkdtemp("sextractor"),
199 os.path.split(sextractor_catalog_name)[-1],
200 )
201 (
202 path_detection_image,
203 path_detection_weight,
204 detection_weight_type,
205 ) = sysmaps_util.get_detection_image(par)
206 remove_detection_image = (self.ctx.current_filter == par.filters[-1]) & (
207 len(par.sextractor_forced_photo_detection_bands) > 1
208 )
209 if par.weight_type != "NONE": 209 ↛ 211line 209 didn't jump to line 211 because the condition on line 209 was never true
210 # write weights for photometry band
211 path_photometry_weight = sysmaps_util.get_path_temp_sextractor_weight(
212 par, self.ctx.current_filter
213 )
214 sysmaps_util.write_temp_sextractor_weight(par, path_photometry_weight)
216 # check if written weights can be removed after running SourceExtractor
217 remove_photo_weights = path_detection_weight != path_photometry_weight
218 remove_detection_weights = (self.ctx.current_filter == par.filters[-1]) & (
219 path_detection_weight != "NONE"
220 )
222 else:
223 detection_weight_type = par.weight_type
224 path_detection_weight = "NONE"
225 path_photometry_weight = "NONE"
226 remove_photo_weights = False
227 remove_detection_weights = False
229 checkimages, checkimages_names, checkimages_names_hdf5 = get_checkimages(
230 sextractor_catalog_name,
231 par.sextractor_checkimages,
232 par.sextractor_checkimages_suffixes,
233 )
235 if par.flag_gain_times_nexp: 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true
236 run_sextractor(
237 par.sextractor_binary,
238 (path_detection_image, par.image_name),
239 par.sextractor_config,
240 seeing_fwhm=get_seeing_value(self.ctx) * par.pixscale,
241 satur_key="NONE",
242 satur_level=par.saturation_level,
243 mag_zeropoint=par.magzero,
244 gain_key="NONE",
245 gain=par.n_exp * par.gain,
246 pixel_scale=par.pixscale,
247 starnnw_name=par.sextractor_nnw,
248 filter_name=par.sextractor_filter,
249 parameters_name=par.sextractor_params,
250 catalog_name=sextractor_catalog_name,
251 checkimage_type=",".join(checkimages),
252 checkimage_name=",".join(checkimages_names),
253 weight_type=f"{detection_weight_type},{par.weight_type}",
254 weight_gain=",".join([par.weight_gain] * 2),
255 weight_image=(path_detection_weight, path_photometry_weight),
256 catalog_type="FITS_LDAC",
257 verbose_type="QUIET",
258 )
260 else:
261 run_sextractor(
262 par.sextractor_binary,
263 (path_detection_image, par.image_name),
264 par.sextractor_config,
265 seeing_fwhm=get_seeing_value(self.ctx) * par.pixscale,
266 satur_key="NONE",
267 satur_level=par.saturation_level,
268 mag_zeropoint=par.magzero,
269 gain_key="NONE",
270 gain=par.gain,
271 pixel_scale=par.pixscale,
272 starnnw_name=par.sextractor_nnw,
273 filter_name=par.sextractor_filter,
274 parameters_name=par.sextractor_params,
275 catalog_name=sextractor_catalog_name,
276 checkimage_type=",".join(checkimages),
277 checkimage_name=",".join(checkimages_names),
278 weight_type=f"{detection_weight_type},{par.weight_type}",
279 weight_gain=",".join([par.weight_gain] * 2),
280 catalog_type="FITS_LDAC",
281 verbose_type="QUIET",
282 )
284 convert_fits_to_hdf(filepath_fits=sextractor_catalog_name)
286 checkimages_to_hdf(
287 checkimage_type=checkimages,
288 checkimages_names_fits=checkimages_names,
289 checkimages_names_hdf5=checkimages_names_hdf5,
290 )
292 # Check whether detection and photo weight image should be removed
293 if remove_photo_weights: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true
294 os.remove(path_photometry_weight)
296 if remove_detection_weights: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true
297 os.remove(path_detection_weight)
299 # Check if detection image should be removed
300 if remove_detection_image:
301 os.remove(path_detection_image)
303 def __str__(self):
304 return NAME