Coverage for src/ufig/plugins/draw_stars_besancon_map.py: 94%
178 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 15:17 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 15:17 +0000
1# Copyright (C) 2016 ETH Zurich, Institute for Astronomy
3"""
4Created on Jul 20, 2016
5author: Tomasz Kacprzak
6"""
8import warnings
10import h5py
11import healpy as hp
12import numpy as np
13import six
14from cosmic_toolbox import arraytools as at
15from ivy.plugin.base_plugin import BasePlugin
17from .. import coordinate_util, io_util, sampling_util
18from ..sampling_util import sample_position_uniform
20if six.PY2: # pragma: no cover
21 import cPickle as pickle
22else:
23 import pickle
26NAME = "draw stars"
29def get_interp_nearest(ra_new, dec_new, dict_besancon_info):
30 ipix = coordinate_util.radec2pix(
31 ra=ra_new, dec=dec_new, nside=dict_besancon_info["nside"]
32 )
33 if dict_besancon_info["healpix_list"][ipix] is None: 33 ↛ 34line 33 didn't jump to line 34 because the condition on line 33 was never true
34 warnings.warn(
35 "Besancon model: queried position is outside the area covered by the model;"
36 " falling back to default",
37 stacklevel=1,
38 )
39 ipix = 10000
41 closest_cat = dict_besancon_info["healpix_list"][ipix]
43 return closest_cat
46def load_besancon_map_info(ctx):
47 par = ctx.parameters
49 filepath_map = io_util.get_abs_path(par.besancon_map_path, par.maps_remote_dir)
51 dict_besancon_info = {}
53 with h5py.File(filepath_map, "r") as ff:
54 dict_besancon_info["simulation_area"] = float(np.array(ff["simulation_area"]))
55 dict_besancon_info["nside"] = int(np.array(ff["nside"]))
57 return dict_besancon_info
60def load_besancon_map_pixels(ctx, list_ipix):
61 """
62 Simply load the pickle with maps, from par.besancon_map_path
64 return pickle with the Besancon simulation map model, with fields
66 dict_besancon_info={'simulation_area': area_store, 'nside': nside,
67 'healpix_list': list_df, 'healpix_mask': hp_map}
69 simulation_area - area in sq. deg corresponding to the simulation catalog covers
71 nside - resolution of simulation sampling
73 healpix_list - a list of numpy record arrays, each element corresponds to a pixel on
74 HEALPix grid with nside=nside
76 hp_map - HEALPix map showing the coverage of simulations, ring=False
77 """
79 par = ctx.parameters
81 filepath_map = io_util.get_abs_path(par.besancon_map_path, par.maps_remote_dir)
83 dist_besancon_pix_cat = {}
85 with h5py.File(filepath_map, "r") as ff:
86 for ipix in list_ipix:
87 if len(ff[f"healpix_list/{ipix:04d}"]) == 0: 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true
88 besancon_pix_cat = None
89 else:
90 besancon_pix_cat = np.array(ff[f"healpix_list/{ipix:04d}"])
92 dist_besancon_pix_cat[f"{ipix:04d}"] = besancon_pix_cat
94 return dist_besancon_pix_cat
97def load_besancon_map(ctx):
98 """
99 Simply load the pickle with maps, from par.besancon_map_path
100 return pickle with the Besancon simulation map model, with fields
102 dict_besancon_info = {'simulation_area': area_store, 'nside': nside,
103 'healpix_list': list_df, 'healpix_mask': hp_map}
105 simulation_area - area in sq. deg corresponding to the simulation catalog covers
107 nside - resolution of simulation sampling
109 healpix_list - a list of numpy record arrays, each element corresponds
110 to a pixel on HEALPix grid with nside=nside
112 hp_map - HEALPix map showing the coverage of simulations, ring=False
113 """
115 par = ctx.parameters
117 filepath_map = io_util.get_abs_path(par.besancon_map_path, par.maps_remote_dir)
119 if ".pkl" in par.besancon_map_path: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true
120 with open(filepath_map) as ff:
121 dict_besancon_info = pickle.load(ff)
123 elif ".h5" in par.besancon_map_path: 123 ↛ 141line 123 didn't jump to line 141 because the condition on line 123 was always true
124 dict_besancon_info = {}
125 with h5py.File(filepath_map, "r") as ff:
126 dict_besancon_info["healpix_mask"] = np.array(ff["healpix_mask"])
127 dict_besancon_info["simulation_area"] = float(
128 np.array(ff["simulation_area"])
129 )
130 dict_besancon_info["nside"] = int(np.array(ff["nside"]))
131 n_pix = len(dict_besancon_info["healpix_mask"])
132 dict_besancon_info["healpix_list"] = [None] * n_pix
133 for ip in range(n_pix):
134 if len(ff[f"healpix_list/{ip:04d}"]) == 0:
135 dict_besancon_info["healpix_list"][ip] = None
136 else:
137 dict_besancon_info["healpix_list"][ip] = np.array(
138 ff[f"healpix_list/{ip:04d}"]
139 )
141 return dict_besancon_info
144def get_star_cat_besancon(ctx):
145 par = ctx.parameters
147 w = coordinate_util.tile_in_skycoords(
148 pixscale=par.pixscale,
149 ra0=par.ra0,
150 dec0=par.dec0,
151 crpix_ra=par.crpix_ra,
152 crpix_dec=par.crpix_dec,
153 )
155 pixels = coordinate_util.get_healpix_pixels(
156 par.nside_sampling, w, par.size_x, par.size_y
157 )
158 pixarea = hp.nside2pixarea(par.nside_sampling, degrees=True)
160 dict_besancon_info = load_besancon_map(ctx)
161 cat_stars = get_interp_nearest(
162 ra_new=par.ra0, dec_new=par.dec0, dict_besancon_info=dict_besancon_info
163 )
164 simulation_area = dict_besancon_info["simulation_area"]
166 # Choose stars randomly according to magnitude limits (including Poisson noise),
167 # apply limits for detection band
168 mask = (cat_stars[par.reference_band] >= par.stars_mag_min) & (
169 cat_stars[par.reference_band] <= par.stars_mag_max
170 )
172 cat_stars_select = cat_stars[mask]
174 max_numstars = np.count_nonzero(mask)
176 # Mean number of stars within a Healpix pixel
177 mean_numstars = np.int32(np.around(max_numstars * pixarea / simulation_area))
179 # Loop over pixels
180 for pixel in pixels:
181 # Reseed random library
182 np.random.seed(par.seed + pixel + par.star_num_seed_offset)
183 nstars = np.random.poisson(mean_numstars)
184 while ( 184 ↛ 187line 184 didn't jump to line 187 because the condition on line 184 was never true
185 nstars > max_numstars
186 ): # in case drawn number of stars is larger than number of stars in catalog
187 nstars = np.random.poisson(mean_numstars)
189 # In case the Healpix pixel is empty
190 if nstars == 0:
191 continue
193 # Reseed random library
194 np.random.seed(par.seed + pixel + par.star_dist_seed_offset)
196 # Positions
197 x, y = sample_position_uniform(nstars, w, pixel, par.nside_sampling)
198 ctx.stars.x = np.append(ctx.stars.x, x)
199 ctx.stars.y = np.append(ctx.stars.y, y)
201 # Set magnitudes according to catalog
202 ind = np.random.choice(
203 np.arange(len(cat_stars_select)), size=nstars, replace=False
204 )
206 # Set magnitudes according to catalog
207 for f in par.filters:
208 ctx.stars.magnitude_dict[f] = np.append(
209 ctx.stars.magnitude_dict[f], cat_stars_select[f][ind]
210 )
213def transform_from_sdss_to_gaia_colours(cat):
214 # https://www.aanda.org/articles/aa/pdf/2018/08/aa32756-18.pdf Table A.2
215 colours = np.array(
216 [
217 np.ones(len(cat)),
218 (cat["g"] - cat["i"]),
219 (cat["g"] - cat["i"]) ** 2,
220 (cat["g"] - cat["i"]) ** 3,
221 ]
222 ).T
224 coeffs = np.array([-0.074189, -0.51409, -0.080607, 0.0016001])[np.newaxis, :]
226 G_gaia_minus_g_sdss = np.sum((colours * coeffs), axis=1)
228 gaia_G = cat["g"] + G_gaia_minus_g_sdss
230 return gaia_G
233def get_star_cat_besancon_gaia_splice(ctx):
234 # load Besancon map
236 par = ctx.parameters
237 w = coordinate_util.wcs_from_parameters(par)
238 pixels = coordinate_util.get_healpix_pixels(
239 par.nside_sampling, w, par.size_x, par.size_y
240 )
241 pixarea = hp.nside2pixarea(par.nside_sampling, degrees=True)
243 # get the scaling for star counts
244 if hasattr(par, "stars_counts_scale"): 244 ↛ 245line 244 didn't jump to line 245 because the condition on line 244 was never true
245 stars_counts_scale = par.stars_counts_scale
246 else:
247 stars_counts_scale = 1
249 # load besancon
250 dict_besancon_info = load_besancon_map_info(ctx)
251 list_ipix_besancon = []
252 for pixel in pixels:
253 theta, phi = hp.pix2ang(par.nside_sampling, pixel)
254 ipix_besancon = hp.ang2pix(dict_besancon_info["nside"], theta, phi)
255 list_ipix_besancon.append(ipix_besancon)
257 besancon_pixels = np.unique(list_ipix_besancon)
258 dict_cat_besancon = load_besancon_map_pixels(ctx, besancon_pixels)
259 for key in dict_cat_besancon:
260 cat = dict_cat_besancon[key]
261 select = (cat[par.reference_band] > par.stars_mag_min) & (
262 cat[par.reference_band] < par.stars_mag_max
263 )
264 cat = cat[select]
266 cat = at.ensure_cols(
267 cat, names=["index:i4", "index_gaia:i4", "pos_x", "pos_y", "gaia_G"]
268 )
269 cat["index"] = np.arange(len(cat))
270 cat["gaia_G"] = transform_from_sdss_to_gaia_colours(cat)
271 dict_cat_besancon[key] = cat
273 # load GAIA
274 with h5py.File(par.filepath_gaia, "r") as fh5:
275 cat_gaia = np.array(fh5["data"])
277 # get image coordinates for GAIA stars
278 cat_gaia = at.ensure_cols(
279 cat_gaia, names=["index:i4", "index_besancon:i4", "x", "y"]
280 )
282 wcs = coordinate_util.tile_in_skycoords(
283 pixscale=par.pixscale,
284 ra0=par.ra0,
285 dec0=par.dec0,
286 crpix_ra=par.crpix_ra,
287 crpix_dec=par.crpix_dec,
288 )
290 cat_gaia["x"], cat_gaia["y"] = wcs.all_world2pix(cat_gaia["ra"], cat_gaia["dec"], 0)
291 cat_gaia["index"] = np.arange(len(cat_gaia))
293 # get areas for both catalogues
294 besancon_area = dict_besancon_info["simulation_area"]
296 # calculate healpix pixels for Gaia stars
298 gaia_hppix = coordinate_util.radec2pix(
299 ra=cat_gaia["ra"], dec=cat_gaia["dec"], nside=par.nside_sampling
300 )
302 list_pixel_besancon = []
304 # Loop over pixels
305 for pixel in pixels:
306 # get the pixel index in the besancon catalogue
308 theta, phi = hp.pix2ang(par.nside_sampling, pixel)
309 ipix_besancon = hp.ang2pix(dict_besancon_info["nside"], theta, phi)
310 cat_besancon = dict_cat_besancon[f"{ipix_besancon:04d}"]
312 # current number of stars according to besancon
314 n_stars_current = np.int32(
315 np.around(len(cat_besancon) * pixarea / besancon_area)
316 )
317 np.random.seed(par.seed + pixel + par.star_num_seed_offset)
318 stars_counts_scale = (
319 par.stars_counts_scale if hasattr(par, "stars_counts_scale") else 1.0
320 )
321 n_stars_current_noise = np.random.poisson(n_stars_current * stars_counts_scale)
322 indices_random = np.random.choice(
323 len(cat_besancon), size=n_stars_current_noise, replace=True
324 )
325 current_besancon = cat_besancon[indices_random].copy()
326 current_besancon = np.sort(current_besancon, order=["g"])
327 np.random.seed(par.seed + pixel + par.star_dist_seed_offset)
328 x, y = sample_position_uniform(
329 n_stars_current_noise, w, pixel, par.nside_sampling
330 )
331 current_besancon["index_gaia"] = -99
332 current_besancon["pos_x"] = x
333 current_besancon["pos_y"] = y
335 # select gaia stars in current pixel
337 current_gaia = cat_gaia[gaia_hppix == pixel].copy()
338 n_current_gaia = len(current_gaia)
339 current_besancon_nogaia = current_besancon[n_current_gaia:]
341 # match gaia and besancon
342 indices_match_besancon = np.argmin(
343 (
344 (
345 current_gaia["phot_g_mean_mag"][:, np.newaxis]
346 - cat_besancon["gaia_G"][np.newaxis]
347 )
348 ** 2
349 ),
350 axis=1,
351 )
352 current_besancon_gaia = cat_besancon[indices_match_besancon].copy()
353 current_besancon_gaia["index_gaia"] = current_gaia["index"]
354 current_besancon_gaia["pos_x"] = current_gaia["x"]
355 current_besancon_gaia["pos_y"] = current_gaia["y"]
357 current_join_cat = np.concatenate(
358 [current_besancon_gaia, current_besancon_nogaia]
359 )
360 list_pixel_besancon.append(current_join_cat)
362 cat_full = np.concatenate(list_pixel_besancon)
363 ctx.stars.x = np.append(ctx.stars.x, cat_full["pos_x"])
364 ctx.stars.y = np.append(ctx.stars.y, cat_full["pos_y"])
366 # Set magnitudes according to catalog
367 for f in par.filters:
368 ctx.stars.magnitude_dict[f] = np.append(
369 ctx.stars.magnitude_dict[f], cat_full[f]
370 )
373STAR_GENERATOR = {
374 "besancon_map": get_star_cat_besancon,
375 "besancon_gaia_splice": get_star_cat_besancon_gaia_splice,
376}
379class Plugin(BasePlugin):
380 """
381 Draw stars for the r-band from a catalog created using the Besancon model of the
382 Milky Way.
384 :param besancon_cat_name: Path to the catalog of stars drawn from the model stored
385 as fits-file. The catalog must contain a column called 'r' giving the magnitude
386 of the stars in the r-band. Furthermore, the header of the HDU in which the
387 catalog is stored must contain a field labeled 'AREA' which states the area (in
388 sq. deg) the catalog covers.
389 :param seed: General seed.
390 :param star_num_seed_offset: Seed offset before number of stars is drawn.
391 :param stars_mag_min: Minimum magnitude of stars in the r-band.
392 :param stars_mag_max: Maximum magnitude of stars in the r-band.
393 :param star_dist_seed_offset: Seed offset before positions of stars are drawn.
394 :param star_nphot_seed_offset: Seed offset before numbers of photons are calculated
395 for stars.
396 :param n_exp: Number of single exposures in coadded image.
397 :param magzero: Magnitude zeropoint.
398 :param gain: Gain in electrons per ADU.
399 :param exp_time_file_name: File name of an exposure time map.
400 :param size_x: Size of the image in x-direction.
401 :param size_y: Size of the image in y-direction.
402 :param maps_remote_dir: Remote directory where maps are stored.
403 """
405 def __call__(self):
406 par = self.ctx.parameters
408 if not hasattr(par, "crpix_ra"):
409 par.crpix_ra = par.size_x / 2 + 0.5
410 if not hasattr(par, "crpix_dec"):
411 par.crpix_dec = par.size_y / 2 + 0.5
413 # Initialize star catalog
414 self.ctx.stars = sampling_util.Catalog()
415 self.ctx.stars.columns = ["id", "x", "y"]
417 self.ctx.stars.x = np.zeros(0, dtype=np.float32)
418 self.ctx.stars.y = np.zeros(0, dtype=np.float32)
419 self.ctx.stars.magnitude_dict = {}
420 for f in par.filters:
421 self.ctx.stars.magnitude_dict[f] = np.zeros(0, dtype=np.float32)
423 # Get magnitudes and position
424 generator = STAR_GENERATOR[par.star_catalogue_type]
425 generator(self.ctx)
427 # Mask stars not on image
428 mask = (
429 (self.ctx.stars.x >= 0)
430 & (self.ctx.stars.x < par.size_x)
431 & (self.ctx.stars.y >= 0)
432 & (self.ctx.stars.y < par.size_y)
433 )
435 self.ctx.numstars = np.sum(mask)
436 self.ctx.stars.id = np.arange(self.ctx.numstars)
438 self.ctx.stars.x = self.ctx.stars.x[mask]
439 self.ctx.stars.y = self.ctx.stars.y[mask]
441 for f in par.filters:
442 self.ctx.stars.magnitude_dict[f] = self.ctx.stars.magnitude_dict[f][mask]
444 def __str__(self):
445 return NAME