Coverage for src/ufig/plugins/match_sextractor_seg_catalog_multiband_read.py: 92%
104 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 19:08 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 19:08 +0000
1# Copyright (c) 2015 ETH Zurich, Institute of Astronomy, Beatrice Moser
2# <moserb@phys.ethz.ch>
4"""
5Created on Dec 21, 2023
6@author: Beatrice Moser
8"""
10import h5py
11import numpy as np
12from cosmic_toolbox import arraytools as at
13from cosmic_toolbox import logger
14from ivy.plugin.base_plugin import BasePlugin
16LOGGER = logger.get_logger(__file__)
17NO_MATCH_VAL = -200
19HDF5_COMPRESS = {"compression": "gzip", "compression_opts": 9, "shuffle": True}
22def match(x_in, y_in, mag_in, segimage, mag_out, bands):
23 """
24 Match UFig input positions and magnitudes to SExtractor output positions and
25 magnitudes.
27 :param x_in: Input x-coordinates (in SExtractor convention)
28 :param y_in: Input y-coordinates (in SExtractor convention)
29 :param mag_in: Input magnitudes
30 :param mag_out: Output magnitudes
31 :param par.mag_diff: Maximum magnitude difference between an input and a detected
32 object to classify as a match.
33 :return: Indices linking detected to input objects.
34 """
36 indices_link = np.full_like(mag_out["i"], NO_MATCH_VAL, dtype=int)
38 x_in = np.array(x_in)
39 y_in = np.array(y_in)
41 # Adjust coordinates if equal to 4200
42 x_in[x_in == np.shape(segimage)[1]] -= 1
43 y_in[y_in == np.shape(segimage)[0]] -= 1
45 # Get unique matches
46 matches = segimage[y_in.astype(int), x_in.astype(int)]
47 unique_matches, match_counts = np.unique(matches, return_counts=True)
49 # Eliminate the unnecessary list creation and optimize the matching process
50 for j in unique_matches[1:]:
51 sel_gals = np.where(matches == j)[0]
52 mag_diff = sum(np.abs(mag_out[f][j - 1] - mag_in[f][sel_gals]) for f in bands)
53 match_index = np.argmin(mag_diff)
54 indices_link[j - 1] = sel_gals[match_index]
56 return indices_link
59class Plugin(BasePlugin):
60 """
61 Matches a Source Extractor catalog to the input catalogs and adds all the input
62 columns to the Source Extractor catalog.
63 """
65 def get_sexcat_paths(self, f):
66 """
67 Get paths of SExtractor catalogs to be matched from list of plugins.
69 :param plugin_list: List of plugins
70 :return: List of paths of SExtractor catalogs to be matched
71 """
73 paths = self.ctx.parameters.sextractor_forced_photo_catalog_name_dict[f]
74 return paths
76 def get_ucat_paths(self, f, sg):
77 if sg == "galaxy":
78 paths = self.ctx.parameters.galaxy_catalog_name_dict[f]
79 elif sg == "star": 79 ↛ 82line 79 didn't jump to line 82 because the condition on line 79 was always true
80 paths = self.ctx.parameters.star_catalog_name_dict[f]
82 return paths
84 def __call__(self):
85 par = self.ctx.parameters
87 indices_link = {}
88 sexcat_dict = {}
89 gal_dict = {}
90 star_dict = {}
91 x_out_dict = {}
92 y_out_dict = {}
93 mag_out_dict = {}
94 x_in = np.array([], float)
95 y_in = np.array([], float)
96 mag_in_dict = {}
97 path_ucat = {}
98 path_sexcat = {}
100 for f in par.filters:
101 if "galaxies" in self.ctx: 101 ↛ 108line 101 didn't jump to line 108 because the condition on line 101 was always true
102 path_ucat[f] = self.get_ucat_paths(f=f, sg="galaxy")
103 gal_dict[f] = at.load_hdf(path_ucat[f])
104 # match
105 x_in = gal_dict[f]["x"]
106 y_in = gal_dict[f]["y"]
107 mag_in_dict[f] = gal_dict[f]["mag"]
108 if "stars" in self.ctx: 108 ↛ 116line 108 didn't jump to line 116 because the condition on line 108 was always true
109 path_ucat[f] = self.get_ucat_paths(f=f, sg="star")
110 star_dict[f] = at.load_hdf(path_ucat[f])
111 x_in = np.append(x_in, star_dict[f]["x"])
112 y_in = np.append(y_in, star_dict[f]["y"])
113 mag_in_dict[f] = np.append(mag_in_dict[f], star_dict[f]["mag"])
114 else:
115 # if no stars are present, create rec array with no data
116 star_dict[f] = np.recarray(
117 0, dtype=[("x", float), ("y", float), ("mag", float)]
118 )
120 path_sexcat[f] = self.get_sexcat_paths(f=f)
121 # load catalog
122 sexcat_dict[f] = at.load_hdf_cols(path_sexcat[f])
123 x_out_dict[f] = sexcat_dict[f][par.matching_x]
124 y_out_dict[f] = sexcat_dict[f][par.matching_y]
125 mag_out_dict[f] = sexcat_dict[f][par.matching_mag]
127 catalog_name_short = path_sexcat["i"].rsplit(".", 1)[0]
128 segimage_name = catalog_name_short + "_seg.h5"
129 segimage = h5py.File(segimage_name, "r")["SEGMENTATION"][:]
131 indices_link = match(
132 x_in, y_in, mag_in_dict, segimage, mag_out_dict, par.filters
133 )
135 # split matches into stars and galaxies
136 galaxy_mask = (indices_link < self.ctx.numgalaxies) & (
137 indices_link != NO_MATCH_VAL
138 )
139 star_mask = (indices_link >= self.ctx.numgalaxies) & (
140 indices_link != NO_MATCH_VAL
141 )
142 galaxy_indices = indices_link[galaxy_mask]
143 star_indices = indices_link[star_mask] - self.ctx.numgalaxies
145 def append_(obj, col):
146 new_columns.append(f"{col}:{obj[col].dtype.str}")
147 new_column_shapes.append(obj[col].shape[1:])
148 new_columns_names.append(col)
150 for f in par.filters:
151 # add columns
152 new_columns = []
153 new_column_shapes = []
154 new_columns_names = []
156 for col in gal_dict[f].dtype.names:
157 if col not in sexcat_dict[f].dtype.names: 157 ↛ 156line 157 didn't jump to line 156 because the condition on line 157 was always true
158 append_(gal_dict[f], col)
160 for col in star_dict[f].dtype.names:
161 if ( 161 ↛ 165line 161 didn't jump to line 165
162 col not in sexcat_dict[f].dtype.names
163 and col not in gal_dict[f].dtype.names
164 ):
165 append_(star_dict[f], col)
167 if "star_gal" not in sexcat_dict[f].dtype.names: 167 ↛ 172line 167 didn't jump to line 172 because the condition on line 167 was always true
168 new_columns.append(f"star_gal:{np.array([NO_MATCH_VAL]).dtype.str}")
169 new_column_shapes.append(())
170 new_columns_names.append("star_gal")
172 sexcat_dict[f] = at.add_cols(
173 sexcat_dict[f], new_columns, shapes=new_column_shapes, data=NO_MATCH_VAL
174 )
176 # add matched values
177 if "star_gal" in new_columns_names: 177 ↛ 182line 177 didn't jump to line 182 because the condition on line 177 was always true
178 sexcat_dict[f]["star_gal"][galaxy_mask] = 0
179 sexcat_dict[f]["star_gal"][star_mask] = 1
180 new_columns_names.remove("star_gal")
182 for new_col in new_columns_names:
183 if new_col in gal_dict[f].dtype.names: 183 ↛ 188line 183 didn't jump to line 188 because the condition on line 183 was always true
184 sexcat_dict[f][new_col][galaxy_mask] = gal_dict[f][new_col][
185 galaxy_indices
186 ]
188 if new_col in star_dict[f].dtype.names: 188 ↛ 182line 188 didn't jump to line 182 because the condition on line 188 was always true
189 sexcat_dict[f][new_col][star_mask] = star_dict[f][new_col][
190 star_indices
191 ]
193 # write extended catalog to disk
194 at.save_hdf_cols(path_sexcat[f], sexcat_dict[f], compression=HDF5_COMPRESS)
196 def __str__(self):
197 return "match catalog to input"