Coverage for src/ufig/plugins/add_generic_stamp_flags.py: 99%
67 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 19:08 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 19:08 +0000
1# Copyright (c) 2016 ETH Zurich, Institute of Astronomy, Tomasz Kacprzak
2# <tomasz.kacprzak@phys.ethz.ch>
3"""
4Created on Aug 5, 2016
5@author: Tomasz Kacprzak
7"""
9import h5py
10import numpy as np
11from cosmic_toolbox import arraytools as at
12from cosmic_toolbox import logger
13from ivy.plugin.base_plugin import BasePlugin
15from ufig import mask_utils
16from ufig.array_util import set_flag_bit
18LOGGER = logger.get_logger(__file__)
20FLAGBIT_EDGE_DUPLICATE = 1
21FLAGBIT_COADD_BOUNDARY = 2
22FLAGBIT_IMAGE_BOUNDARY = 3
23FLAGBIT_SYSMAP_DELTA_WEIGHT = 4
24FLAGBIT_NEARBY_BRIGHT_STAR = 5
25FLAGBIT_SURVEY_MASK = 6
28class Plugin(BasePlugin):
29 def __call__(self):
30 par = self.ctx.parameters
32 if hasattr(par, "sextractor_forced_photo_catalog_name"): 32 ↛ exitline 32 didn't return from function '__call__' because the condition on line 32 was always true
33 add_all_stamp_flags(
34 par.sextractor_catalog_name_dict[self.ctx.current_filter],
35 par.filepath_overlapblock,
36 par.sextractor_mask_name_dict[self.ctx.current_filter],
37 par.sextractor_catalog_off_mask_radius,
38 )
40 def __str__(self):
41 return "tile overlap/coadd bounadry col"
44def add_all_stamp_flags(
45 filename_cat,
46 filename_overlapblock,
47 filename_mask,
48 off_mask_radius,
49):
50 add_edge_duplicate_flag(filename_cat, filename_overlapblock)
51 add_pixel_based_masks(filename_cat, filename_mask, off_mask_radius)
54def load_flags_stamp(filename_cat, len_cat):
55 with h5py.File(filename_cat, "r") as f:
56 if "FLAGS_STAMP" in f:
57 flags_stamp = np.array(f["FLAGS_STAMP"])
58 else:
59 flags_stamp = np.zeros(len_cat, dtype=np.uint32)
60 return flags_stamp
63def save_flags_stamp(filename_cat, flags_stamp):
64 with h5py.File(filename_cat, "a") as f:
65 if "FLAGS_STAMP" in f:
66 del f["FLAGS_STAMP"]
67 f.create_dataset(name="FLAGS_STAMP", data=flags_stamp, compression="lzf")
68 LOGGER.info(f"Saved flags to {filename_cat}")
71def add_pixel_based_masks(filename_cat, filename_mask, off_mask_radius):
72 cat = at.load_hdf_cols(
73 filename_cat, columns=["XWIN_IMAGE", "YWIN_IMAGE", "FLUX_RADIUS"]
74 )
75 flags_stamp = load_flags_stamp(filename_cat, len(cat))
77 # Pixel mask
78 with h5py.File(filename_mask, mode="r") as fh5:
79 pixel_mask = fh5["mask"][...]
81 FLAGBITS = {"delta_weight": 0, "star": 1, "mask": 2}
82 n_bits = len(FLAGBITS)
84 # Get masked pixels
85 pixel_mask_sysmap_delta_weight = mask_utils.set_masked_pixels(
86 pixel_mask, [FLAGBITS["delta_weight"]], n_bits=n_bits
87 )
88 pixel_mask_nearby_star = mask_utils.set_masked_pixels(
89 pixel_mask, [FLAGBITS["star"]], n_bits=n_bits
90 )
91 pixel_mask_survey_mask = mask_utils.set_masked_pixels(
92 pixel_mask, [FLAGBITS["mask"]], n_bits=n_bits
93 )
95 # Transform to catalog mask
96 select_sysmap_delta_weight = ~mask_utils.pixel_mask_to_catalog_mask(
97 pixel_mask_sysmap_delta_weight, cat, off_mask_radius
98 )
99 select_mask_nearby_star = ~mask_utils.pixel_mask_to_catalog_mask(
100 pixel_mask_nearby_star, cat, off_mask_radius
101 )
102 select_survey_mask = ~mask_utils.pixel_mask_to_catalog_mask(
103 pixel_mask_survey_mask, cat, off_mask_radius
104 )
106 # Set FLAGS_STAMP
107 set_flag_bit(
108 flags=flags_stamp,
109 select=select_sysmap_delta_weight,
110 field=FLAGBIT_SYSMAP_DELTA_WEIGHT,
111 )
112 set_flag_bit(
113 flags=flags_stamp,
114 select=select_mask_nearby_star,
115 field=FLAGBIT_NEARBY_BRIGHT_STAR,
116 )
117 set_flag_bit(
118 flags=flags_stamp, select=select_survey_mask, field=FLAGBIT_SURVEY_MASK
119 )
121 save_flags_stamp(filename_cat, flags_stamp)
124def add_edge_duplicate_flag(filename_cat, filename_overlapblock):
125 with h5py.File(filename_cat, "r") as f:
126 x = np.int32(np.array(f["X_IMAGE"])) - 1
127 y = np.int32(np.array(f["Y_IMAGE"])) - 1
129 with h5py.File(filename_overlapblock, "r") as f:
130 img_mask = np.array(f["img_mask"], dtype=np.int8)
132 x[x < 0] = 0
133 y[y < 0] = 0
134 x[x >= img_mask.shape[1]] = img_mask.shape[1]
135 y[y >= img_mask.shape[0]] = img_mask.shape[0]
137 is_duplicate = img_mask[y, x] == 1
139 flags_stamp = load_flags_stamp(filename_cat, len(is_duplicate))
140 set_flag_bit(flags=flags_stamp, select=is_duplicate, field=FLAGBIT_EDGE_DUPLICATE)
142 save_flags_stamp(filename_cat, flags_stamp)