Coverage for src/ufig/plugins/single_band_setup.py: 97%
120 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 19:08 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 19:08 +0000
1# Copyright (C) 2016 ETH Zurich, Institute for Astronomy
3"""
4Created on Feb 09, 2016
5author: Joerg Herbel
6"""
8import numpy as np
9from ivy.plugin.base_plugin import BasePlugin
11from ufig import io_util, sysmaps_util
13NAME = "setup single-band"
16def initialize_shape_size_columns(galaxies, numgalaxies, precision):
17 # Shear
18 for col in ["gamma1", "gamma2", "kappa"]:
19 if not hasattr(galaxies, col):
20 setattr(galaxies, col, np.zeros(numgalaxies, dtype=precision))
22 # Size and shape after shear
23 for col in ("e1", "e2", "r50"):
24 if not hasattr(galaxies, col):
25 setattr(galaxies, col, getattr(galaxies, f"int_{col}").copy())
28def initialize_psf_columns(obj, n_obj, band, precision=np.float32):
29 """
30 Adds a negligible PSF to simulated objects (stars or galaxies)
31 :param obj: object catalog (stars or galaxies)
32 :param n_obj: number of objects
33 """
35 # set defaults
36 cols_init = {}
37 cols_init["psf_beta"] = lambda: np.full((n_obj, 1), 2.0, dtype=precision)
38 cols_init["psf_flux_ratio"] = lambda: np.ones(n_obj, dtype=precision)
39 cols_init["psf_fwhm"] = lambda: np.full(n_obj, 0.0001, dtype=precision)
40 cols_init["psf_r50"] = lambda: np.full(n_obj, 0.0001, dtype=precision)
41 cols_init["psf_r50_indiv"] = lambda: np.full(n_obj, 0.0001, dtype=precision).T
42 cols_init["psf_e1"] = lambda: np.zeros(n_obj, dtype=precision)
43 cols_init["psf_e2"] = lambda: np.zeros(n_obj, dtype=precision)
44 cols_init["psf_f1"] = lambda: np.zeros(n_obj, dtype=precision)
45 cols_init["psf_f2"] = lambda: np.zeros(n_obj, dtype=precision)
46 cols_init["psf_g1"] = lambda: np.zeros(n_obj, dtype=precision)
47 cols_init["psf_g2"] = lambda: np.zeros(n_obj, dtype=precision)
48 cols_init["psf_kurtosis"] = lambda: np.zeros(n_obj, dtype=precision)
49 cols_init["psf_dx_offset"] = lambda: np.zeros(n_obj, dtype=precision).T
50 cols_init["psf_dy_offset"] = lambda: np.zeros(n_obj, dtype=precision).T
52 for col in cols_init:
53 dict_col_name = f"{col}_dict"
55 # initialize dict if does not exist
56 if not hasattr(obj, dict_col_name):
57 setattr(obj, dict_col_name, {})
59 # assign PSF column to band
60 dict_col = getattr(obj, dict_col_name)
62 # initialize values if fdo not exist
63 if band not in dict_col:
64 dict_col[band] = cols_init[col]()
66 # set the column to the current filter
67 setattr(obj, col, dict_col[band])
70class UFigNumPhotError(ValueError):
71 """
72 Raised when more photons (for galaxies) than allowed by the input parameters are
73 sampled
74 """
77def convert_magnitude_to_nphot_const_texp(
78 mag, par, x=None, y=None, texp_img=None, n_exp=None
79):
80 """
81 Convert the magnitude into a number of photons to be sampled later on
83 :param mag: magnitude of the objects
84 :param par: Container of ctx.parameters parameters (magzero & gain)
85 :param x: x-coordinate of objects (in pixels) (not used here)
86 :param y: y-coordinate of objects (in pixels) (not used here)
87 :param texp_img: Image contining the exposure times at each position (not used here)
88 :param n_exp: number of exposures
89 :return: Number of photons
90 """
91 nphot_mean = np.power(10.0, 0.4 * (par.magzero - mag)) * par.gain * n_exp
92 nphot = np.round(np.random.poisson(nphot_mean) / n_exp)
94 return nphot
97def convert_magnitude_to_nphot_const_gain(
98 mag, par, gain, x=None, y=None, texp_img=None, n_exp=None
99):
100 """
101 Convert the magnitude into a number of photons to be sampled later on
103 :param mag: magnitude of the objects
104 :param par: Container of ctx.parameters parameters (magzero & gain)
105 :param x: x-coordinate of objects (in pixels) (not used here)
106 :param y: y-coordinate of objects (in pixels) (not used here)
107 :param texp_img: Image contining the exposure times at each position (not used here)
108 :param n_exp: number of exposures
109 :return: Number of photons
110 """
111 nphot_mean = np.power(10.0, 0.4 * (par.magzero - mag)) * gain
112 nphot = np.random.poisson(nphot_mean)
114 return nphot
117def get_texp_per_object(par, x, y, texp_img=None):
118 if texp_img is None: 118 ↛ 129line 118 didn't jump to line 129 because the condition on line 118 was always true
119 filename_h5, dataset = sysmaps_util.get_hdf_location_exptime(par)
120 filepath_h5 = io_util.get_abs_path(filename_h5, par.maps_remote_dir)
121 texp_img = io_util.load_from_hdf5(
122 file_name=filepath_h5,
123 hdf5_keys=dataset,
124 hdf5_path="",
125 root_path=par.maps_remote_dir,
126 )
128 # proof in case of position of of image
129 obj_x = x.astype(np.int32)
130 obj_x[obj_x > texp_img.shape[1] - 1] = texp_img.shape[1] - 1
131 obj_x[obj_x < 0] = 0
133 obj_y = y.astype(np.int32)
134 obj_y[obj_y > texp_img.shape[0] - 1] = texp_img.shape[0] - 1
135 obj_y[obj_y < 0] = 0
137 texp_obj = texp_img[obj_y, obj_x]
139 return texp_obj
142def get_gain_per_object(par, x, y, gain_img=None):
143 if gain_img is None: 143 ↛ 154line 143 didn't jump to line 154 because the condition on line 143 was always true
144 filename_h5, dataset = sysmaps_util.get_hdf_location_gain(par)
145 filepath_h5 = io_util.get_abs_path(filename_h5, par.maps_remote_dir)
146 gain_img = io_util.load_from_hdf5(
147 file_name=filepath_h5,
148 hdf5_keys=dataset,
149 hdf5_path="",
150 root_path=par.maps_remote_dir,
151 )
153 # proof in case of position of image
154 obj_x = x.astype(np.int32)
155 obj_x[obj_x > gain_img.shape[1] - 1] = gain_img.shape[1] - 1
156 obj_x[obj_x < 0] = 0
158 obj_y = y.astype(np.int32)
159 obj_y[obj_y > gain_img.shape[0] - 1] = gain_img.shape[0] - 1
160 obj_y[obj_y < 0] = 0
162 gain_obj = gain_img[obj_y, obj_x]
164 return gain_obj
167def convert_magnitude_to_nphot_variable_texp(mag, par, x, y, texp_img=None, n_exp=None):
168 """
169 Convert the magnitude into a number of photons to be sampled later on
171 :param mag: magnitude of the objects
172 :param par: self.ctx.parameters; must contain:
173 magzero: magnitude zero point of target image
174 gain: gain of the target image
175 exp_time_file_name: file name of the exposure time image
176 maps_remote_dir: Remote directory images and maps are stored in if not at
177 'res/maps/'
178 :param x: x-coordinate of objects (in pixels)
179 :param y: y-coordinate of objects (in pixels)
180 :param texp_img: Image contining the exposure times at each position
181 :param n_exp: number of exposures (not used here)
182 :return: Number of photons
183 """
185 texp_obj = get_texp_per_object(par, x, y)
187 nphot = np.zeros_like(texp_obj, dtype=np.int32)
188 mask_texp = (texp_obj > 0) & (texp_obj >= par.exposure_time)
189 nphot[mask_texp] = convert_magnitude_to_nphot_const_texp(
190 mag=mag[mask_texp], par=par, n_exp=texp_obj[mask_texp] // par.exposure_time
191 )
193 return nphot
196def convert_magnitude_to_nphot_with_gain_map(mag, par, x, y, **args):
197 """
198 Convert the magnitude into a number of photons to be sampled later on
200 :param mag: magnitude of the objects
201 :param par: self.ctx.parameters; must contain:
202 magzero: magnitude zero point of target image
203 gain: gain of the target image
204 exp_time_file_name: file name of the exposure time image
205 maps_remote_dir: Remote directory images and maps are stored in if not at
206 'res/maps/'
207 :param x: x-coordinate of objects (in pixels)
208 :param y: y-coordinate of objects (in pixels)
209 :param texp_img: Image contining the exposure times at each position
210 :param n_exp: number of exposures (not used here)
211 :return: Number of photons
212 """
214 gain_obj = get_gain_per_object(par, x, y)
216 nphot = np.zeros_like(gain_obj, dtype=np.int32)
217 nphot = convert_magnitude_to_nphot_const_gain(mag=mag, par=par, gain=gain_obj)
219 return nphot
222NPHOT_GENERATOR = {
223 "constant": convert_magnitude_to_nphot_const_texp,
224 "variable": convert_magnitude_to_nphot_variable_texp,
225 "gain_map": convert_magnitude_to_nphot_with_gain_map,
226}
229class Plugin(BasePlugin):
230 """
231 Set parameters to render an image according to the current filter band and
232 multi-band dictionaries. The number of photons is also calculated here.
233 """
235 def __call__(self):
236 par = self.ctx.parameters
237 nphot_generator = NPHOT_GENERATOR[par.exp_time_type]
239 # Image and general seed
240 np.random.seed(par.seed)
241 self.ctx.image = np.zeros((par.size_y, par.size_x), dtype=par.image_precision)
242 self.ctx.image_mask = np.zeros((par.size_y, par.size_x), dtype=bool)
244 # Specific seeds
245 for seed_name in [
246 "gal_nphot_seed_offset",
247 "star_nphot_seed_offset",
248 "gal_render_seed_offset",
249 "star_render_seed_offset",
250 "background_seed_offset",
251 "seed_ngal",
252 ]:
253 setattr(par, seed_name, getattr(par, seed_name) + 1)
255 # Set quantities from corresponding dictionaries
256 filter_band_params = [
257 param_name for param_name in par if param_name.endswith("_dict")
258 ]
259 for param_name in filter_band_params:
260 try:
261 param_name_stripped = param_name[:-5]
262 setattr(
263 par,
264 param_name_stripped,
265 getattr(par, param_name)[self.ctx.current_filter],
266 )
267 except KeyError:
268 pass
270 if "galaxies" in self.ctx:
271 add_galaxy_col = [
272 "nphot",
273 "gamma1",
274 "gamma2",
275 "kappa",
276 "e1",
277 "e2",
278 "r50",
279 "int_mag",
280 "mag",
281 "abs_mag",
282 "psf_beta",
283 "psf_flux_ratio",
284 "psf_fwhm",
285 "psf_r50",
286 "psf_e1",
287 "psf_e2",
288 "psf_f1",
289 "psf_f2",
290 "psf_g1",
291 "psf_g2",
292 "psf_kurtosis",
293 ]
295 self.ctx.galaxies.columns = list(
296 set(self.ctx.galaxies.columns) | set(add_galaxy_col)
297 )
299 # Initial values for galaxy shapes
300 initialize_shape_size_columns(
301 self.ctx.galaxies, self.ctx.numgalaxies, precision=par.catalog_precision
302 )
304 # Values emulating a negligible PSF effect (overwritten by add_psf-module)
305 initialize_psf_columns(
306 self.ctx.galaxies,
307 self.ctx.numgalaxies,
308 band=self.ctx.current_filter,
309 precision=par.catalog_precision,
310 )
312 #
314 # Magnitudes and numbers of photons
315 self.ctx.galaxies.int_mag = self.ctx.galaxies.int_magnitude_dict[
316 self.ctx.current_filter
317 ].astype(par.catalog_precision)
318 self.ctx.galaxies.abs_mag = self.ctx.galaxies.abs_magnitude_dict[
319 self.ctx.current_filter
320 ].astype(par.catalog_precision)
321 self.ctx.galaxies.mag = self.ctx.galaxies.magnitude_dict[
322 self.ctx.current_filter
323 ].astype(par.catalog_precision)
324 np.random.seed(par.seed + par.gal_nphot_seed_offset)
325 self.ctx.galaxies.nphot = nphot_generator(
326 self.ctx.galaxies.mag,
327 par=par,
328 x=self.ctx.galaxies.x,
329 y=self.ctx.galaxies.y,
330 n_exp=par.n_exp,
331 )
333 sum_nphot = np.sum(self.ctx.galaxies.nphot)
335 if sum_nphot > par.n_phot_sum_gal_max: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true
336 raise UFigNumPhotError(
337 f"Maximum number of photons: {par.n_phot_sum_gal_max}"
338 f" Computed number: {sum_nphot}"
339 )
341 if "stars" in self.ctx: 341 ↛ exitline 341 didn't return from function '__call__' because the condition on line 341 was always true
342 add_star_col = [
343 "nphot",
344 "mag",
345 "psf_flux_ratio",
346 "psf_fwhm",
347 "psf_r50",
348 "psf_e1",
349 "psf_e2",
350 "psf_f1",
351 "psf_f2",
352 "psf_g1",
353 "psf_g2",
354 "psf_kurtosis",
355 ]
357 self.ctx.stars.columns = list(
358 set(self.ctx.stars.columns) | set(add_star_col)
359 )
361 # Values emulating a negligible PSF effect (overwritten by add_psf-module)
362 initialize_psf_columns(
363 self.ctx.stars,
364 self.ctx.numstars,
365 band=self.ctx.current_filter,
366 precision=par.catalog_precision,
367 )
369 # Magnitudes and numbers of photons
370 self.ctx.stars.mag = self.ctx.stars.magnitude_dict[self.ctx.current_filter]
371 np.random.seed(par.seed + par.star_nphot_seed_offset)
372 self.ctx.stars.nphot = nphot_generator(
373 self.ctx.stars.mag,
374 par=par,
375 x=self.ctx.stars.x,
376 y=self.ctx.stars.y,
377 n_exp=par.n_exp,
378 )
380 def __str__(self):
381 return NAME