Coverage for src/ufig/plugins/run_sextractor_forced_photometry.py: 89%
109 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-17 08:48 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-17 08:48 +0000
1# Copyright (c) 2016 ETH Zurich, Institute of Astronomy, Cosmology Research Group
3"""
4Created on Jul 12, 2016
5@author: Joerg Herbel
6"""
8import os
9import subprocess
10import tempfile
12import h5py
13import numpy as np
14from astropy.io import fits
15from cosmic_toolbox import arraytools as at
16from cosmic_toolbox import logger
17from ivy.plugin.base_plugin import BasePlugin
18from pkg_resources import resource_filename
20import ufig
21from ufig import array_util, sysmaps_util
22from ufig.plugins.write_image import get_seeing_value
24LOGGER = logger.get_logger(__file__)
26NAME = "run SourceExtractor (forced-photo)"
27HDF5_COMPRESS = {"compression": "gzip", "compression_opts": 9, "shuffle": True}
30def convert_fits_to_hdf(filepath_fits, fits_ext=2):
31 try:
32 cat = np.array(fits.getdata(filepath_fits, ext=fits_ext))
33 cat = cat.byteswap()
34 cat = cat.view(cat.dtype.newbyteorder())
35 except Exception as errmsg: # pragma: no cover
36 LOGGER.error(f"failed to fits-open {filepath_fits}, already hdf?")
37 LOGGER.error(errmsg)
39 os.remove(filepath_fits)
41 cat = array_util.rec_float64_to_float32(cat)
43 at.save_hdf_cols(filepath_fits, cat, compression=HDF5_COMPRESS)
45 LOGGER.info(f"converted to hdf: {filepath_fits}")
48def checkimages_to_hdf(
49 checkimage_type,
50 checkimages_names_fits,
51 checkimages_names_hdf5,
52 kw_dataset=None,
53):
54 if kw_dataset is None: 54 ↛ 60line 54 didn't jump to line 60 because the condition on line 54 was always true
55 kw_dataset = {
56 "compression": "gzip",
57 "compression_opts": 9,
58 "shuffle": True,
59 }
60 if len(checkimages_names_hdf5) > 0:
61 for c, f_fits, f_hdf5 in zip(
62 checkimage_type, checkimages_names_fits, checkimages_names_hdf5
63 ):
64 img = np.array(fits.getdata(f_fits))
65 with h5py.File(f_hdf5, "w") as f:
66 f.create_dataset(name=c, data=img, **kw_dataset)
67 os.remove(f_fits)
68 LOGGER.info(
69 f"converted checkimage={c} dtype={str(img.dtype)} {f_fits} -> {f_hdf5}"
70 )
73def enforce_abs_path(path):
74 """
75 Build an absolute path using the path of the SExtractor directory in UFig. In case
76 the input is already a path (and not only a filename), it is left unchanged.
78 :param path: Input path
79 :return: Absolute path
80 """
82 if path == os.path.basename(path): 82 ↛ 85line 82 didn't jump to line 85 because the condition on line 82 was always true
83 path = resource_filename(ufig.__name__, "res/sextractor/" + path)
85 return path
88def kwarg_to_sextractor_arg(key, value):
89 """
90 Construct a SExtractor command line argument from a keyword argument. The key of the
91 keyword argument must be a valid SExtractor parameter (modulo upper cases). The
92 value must convertible to a string. It can also be an iterable containing only
93 objects that can be converted to a string.
95 :param key: Key
96 :param value: Value
98 :return: SExtractor command line argument as a list in the form
99 [-<PARAM_NAME>, VALUE]
100 """
102 path_keys = ("starnnw_name", "parameters_name", "filter_name")
104 if str(value) != value:
105 try:
106 value = map(str, value)
107 except TypeError:
108 value = [str(value)]
109 value = ",".join(value)
111 if key in path_keys:
112 value = enforce_abs_path(value)
114 sextractor_arg = ["-" + key.upper(), value]
116 return sextractor_arg
119def build_sextractor_cmd(binary_name, image_name, config_name, **kwargs):
120 """
121 Construct a list of strings which make up a valid command line argument to call
122 SExtractor.
124 :param binary_name: Path of SExtractor executable
125 :param image_name: Path(s) of image(s) on which SExtractor will run
126 :param config_name: Path of SExtractor configuration file
127 :param kwargs: Keyword arguments that can be converted to SExtractor command line
128 arguments
130 :return: Command to call SExtractor as a list of strings
131 """
133 config_name = enforce_abs_path(config_name)
135 if str(image_name) != image_name: 135 ↛ 138line 135 didn't jump to line 138 because the condition on line 135 was always true
136 image_name = ",".join(image_name)
138 cmd = [binary_name, image_name, "-c", config_name]
140 for key in kwargs:
141 cmd += kwarg_to_sextractor_arg(key, kwargs[key])
143 return cmd
146def get_checkimages(
147 sextractor_catalog_name, sextractor_checkimages, sextractor_checkimages_suffixes
148):
149 catalog_name_short = sextractor_catalog_name.rsplit(".", 1)[0]
150 checkimages_names = []
151 checkimages_names_hdf5 = []
152 checkimages = sextractor_checkimages
154 for suffix in sextractor_checkimages_suffixes:
155 checkimage_name = catalog_name_short + suffix
156 checkimage_name_hdf5 = checkimage_name.replace(".fits", ".h5")
157 checkimages_names += [checkimage_name]
158 checkimages_names_hdf5 += [checkimage_name_hdf5]
160 if len(checkimages_names) == 0:
161 checkimages_names = ["NONE"]
162 checkimages = ["NONE"]
164 return checkimages, checkimages_names, checkimages_names_hdf5
167def run_sextractor(binary_name, image_name, config_name, **kwargs):
168 """
169 Run SExtractor by spawning a subprocess.
171 :param binary_name: Path of SExtractor executable
172 :param image_name: Path(s) of image(s) on which SExtractor will run
173 :param config_name: Path of SExtractor configuration file
174 :param kwargs: Keyword arguments that can be converted to SExtractor command line
175 arguments
176 """
177 cmd = build_sextractor_cmd(binary_name, image_name, config_name, **kwargs)
178 LOGGER.info(" ".join(cmd))
179 subprocess.check_call(cmd, stderr=subprocess.STDOUT)
182class Plugin(BasePlugin):
183 """
184 Run SExtractor in forced-photometry mode.
185 """
187 def __call__(self):
188 par = self.ctx.parameters
190 sextractor_catalog_name = par.sextractor_forced_photo_catalog_name_dict[
191 self.ctx.current_filter
192 ]
194 if self.ctx.parameters.sextractor_use_temp: 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true
195 sextractor_catalog_name = os.path.join(
196 tempfile.mkdtemp("sextractor"),
197 os.path.split(sextractor_catalog_name)[-1],
198 )
199 (
200 path_detection_image,
201 path_detection_weight,
202 detection_weight_type,
203 ) = sysmaps_util.get_detection_image(par)
204 remove_detection_image = (self.ctx.current_filter == par.filters[-1]) & (
205 len(par.sextractor_forced_photo_detection_bands) > 1
206 )
207 if par.weight_type != "NONE": 207 ↛ 209line 207 didn't jump to line 209 because the condition on line 207 was never true
208 # write weights for photometry band
209 path_photometry_weight = sysmaps_util.get_path_temp_sextractor_weight(
210 par, self.ctx.current_filter
211 )
212 sysmaps_util.write_temp_sextractor_weight(par, path_photometry_weight)
214 # check if written weights can be removed after running SourceExtractor
215 remove_photo_weights = path_detection_weight != path_photometry_weight
216 remove_detection_weights = (self.ctx.current_filter == par.filters[-1]) & (
217 path_detection_weight != "NONE"
218 )
220 else:
221 detection_weight_type = par.weight_type
222 path_detection_weight = "NONE"
223 path_photometry_weight = "NONE"
224 remove_photo_weights = False
225 remove_detection_weights = False
227 checkimages, checkimages_names, checkimages_names_hdf5 = get_checkimages(
228 sextractor_catalog_name,
229 par.sextractor_checkimages,
230 par.sextractor_checkimages_suffixes,
231 )
233 if par.flag_gain_times_nexp: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true
234 run_sextractor(
235 par.sextractor_binary,
236 (path_detection_image, par.image_name),
237 par.sextractor_config,
238 seeing_fwhm=get_seeing_value(self.ctx) * par.pixscale,
239 satur_key="NONE",
240 satur_level=par.saturation_level,
241 mag_zeropoint=par.magzero,
242 gain_key="NONE",
243 gain=par.n_exp * par.gain,
244 pixel_scale=par.pixscale,
245 starnnw_name=par.sextractor_nnw,
246 filter_name=par.sextractor_filter,
247 parameters_name=par.sextractor_params,
248 catalog_name=sextractor_catalog_name,
249 checkimage_type=",".join(checkimages),
250 checkimage_name=",".join(checkimages_names),
251 weight_type=f"{detection_weight_type},{par.weight_type}",
252 weight_gain=",".join([par.weight_gain] * 2),
253 weight_image=(path_detection_weight, path_photometry_weight),
254 catalog_type="FITS_LDAC",
255 verbose_type="QUIET",
256 )
258 else:
259 run_sextractor(
260 par.sextractor_binary,
261 (path_detection_image, par.image_name),
262 par.sextractor_config,
263 seeing_fwhm=get_seeing_value(self.ctx) * par.pixscale,
264 satur_key="NONE",
265 satur_level=par.saturation_level,
266 mag_zeropoint=par.magzero,
267 gain_key="NONE",
268 gain=par.gain,
269 pixel_scale=par.pixscale,
270 starnnw_name=par.sextractor_nnw,
271 filter_name=par.sextractor_filter,
272 parameters_name=par.sextractor_params,
273 catalog_name=sextractor_catalog_name,
274 checkimage_type=",".join(checkimages),
275 checkimage_name=",".join(checkimages_names),
276 weight_type=f"{detection_weight_type},{par.weight_type}",
277 weight_gain=",".join([par.weight_gain] * 2),
278 catalog_type="FITS_LDAC",
279 verbose_type="QUIET",
280 )
282 convert_fits_to_hdf(filepath_fits=sextractor_catalog_name)
284 checkimages_to_hdf(
285 checkimage_type=checkimages,
286 checkimages_names_fits=checkimages_names,
287 checkimages_names_hdf5=checkimages_names_hdf5,
288 )
290 # Check whether detection and photo weight image should be removed
291 if remove_photo_weights: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true
292 os.remove(path_photometry_weight)
294 if remove_detection_weights: 294 ↛ 295line 294 didn't jump to line 295 because the condition on line 294 was never true
295 os.remove(path_detection_weight)
297 # Check if detection image should be removed
298 if remove_detection_image:
299 os.remove(path_detection_image)
301 def __str__(self):
302 return NAME