Coverage for src/ufig/plugins/match_sextractor_seg_catalog_multiband_read.py: 92%

104 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-12 19:08 +0000

1# Copyright (c) 2015 ETH Zurich, Institute of Astronomy, Beatrice Moser 

2# <moserb@phys.ethz.ch> 

3 

4""" 

5Created on Dec 21, 2023 

6@author: Beatrice Moser 

7 

8""" 

9 

10import h5py 

11import numpy as np 

12from cosmic_toolbox import arraytools as at 

13from cosmic_toolbox import logger 

14from ivy.plugin.base_plugin import BasePlugin 

15 

16LOGGER = logger.get_logger(__file__) 

17NO_MATCH_VAL = -200 

18 

19HDF5_COMPRESS = {"compression": "gzip", "compression_opts": 9, "shuffle": True} 

20 

21 

22def match(x_in, y_in, mag_in, segimage, mag_out, bands): 

23 """ 

24 Match UFig input positions and magnitudes to SExtractor output positions and 

25 magnitudes. 

26 

27 :param x_in: Input x-coordinates (in SExtractor convention) 

28 :param y_in: Input y-coordinates (in SExtractor convention) 

29 :param mag_in: Input magnitudes 

30 :param mag_out: Output magnitudes 

31 :param par.mag_diff: Maximum magnitude difference between an input and a detected 

32 object to classify as a match. 

33 :return: Indices linking detected to input objects. 

34 """ 

35 

36 indices_link = np.full_like(mag_out["i"], NO_MATCH_VAL, dtype=int) 

37 

38 x_in = np.array(x_in) 

39 y_in = np.array(y_in) 

40 

41 # Adjust coordinates if equal to 4200 

42 x_in[x_in == np.shape(segimage)[1]] -= 1 

43 y_in[y_in == np.shape(segimage)[0]] -= 1 

44 

45 # Get unique matches 

46 matches = segimage[y_in.astype(int), x_in.astype(int)] 

47 unique_matches, match_counts = np.unique(matches, return_counts=True) 

48 

49 # Eliminate the unnecessary list creation and optimize the matching process 

50 for j in unique_matches[1:]: 

51 sel_gals = np.where(matches == j)[0] 

52 mag_diff = sum(np.abs(mag_out[f][j - 1] - mag_in[f][sel_gals]) for f in bands) 

53 match_index = np.argmin(mag_diff) 

54 indices_link[j - 1] = sel_gals[match_index] 

55 

56 return indices_link 

57 

58 

59class Plugin(BasePlugin): 

60 """ 

61 Matches a Source Extractor catalog to the input catalogs and adds all the input 

62 columns to the Source Extractor catalog. 

63 """ 

64 

65 def get_sexcat_paths(self, f): 

66 """ 

67 Get paths of SExtractor catalogs to be matched from list of plugins. 

68 

69 :param plugin_list: List of plugins 

70 :return: List of paths of SExtractor catalogs to be matched 

71 """ 

72 

73 paths = self.ctx.parameters.sextractor_forced_photo_catalog_name_dict[f] 

74 return paths 

75 

76 def get_ucat_paths(self, f, sg): 

77 if sg == "galaxy": 

78 paths = self.ctx.parameters.galaxy_catalog_name_dict[f] 

79 elif sg == "star": 79 ↛ 82line 79 didn't jump to line 82 because the condition on line 79 was always true

80 paths = self.ctx.parameters.star_catalog_name_dict[f] 

81 

82 return paths 

83 

84 def __call__(self): 

85 par = self.ctx.parameters 

86 

87 indices_link = {} 

88 sexcat_dict = {} 

89 gal_dict = {} 

90 star_dict = {} 

91 x_out_dict = {} 

92 y_out_dict = {} 

93 mag_out_dict = {} 

94 x_in = np.array([], float) 

95 y_in = np.array([], float) 

96 mag_in_dict = {} 

97 path_ucat = {} 

98 path_sexcat = {} 

99 

100 for f in par.filters: 

101 if "galaxies" in self.ctx: 101 ↛ 108line 101 didn't jump to line 108 because the condition on line 101 was always true

102 path_ucat[f] = self.get_ucat_paths(f=f, sg="galaxy") 

103 gal_dict[f] = at.load_hdf(path_ucat[f]) 

104 # match 

105 x_in = gal_dict[f]["x"] 

106 y_in = gal_dict[f]["y"] 

107 mag_in_dict[f] = gal_dict[f]["mag"] 

108 if "stars" in self.ctx: 108 ↛ 116line 108 didn't jump to line 116 because the condition on line 108 was always true

109 path_ucat[f] = self.get_ucat_paths(f=f, sg="star") 

110 star_dict[f] = at.load_hdf(path_ucat[f]) 

111 x_in = np.append(x_in, star_dict[f]["x"]) 

112 y_in = np.append(y_in, star_dict[f]["y"]) 

113 mag_in_dict[f] = np.append(mag_in_dict[f], star_dict[f]["mag"]) 

114 else: 

115 # if no stars are present, create rec array with no data 

116 star_dict[f] = np.recarray( 

117 0, dtype=[("x", float), ("y", float), ("mag", float)] 

118 ) 

119 

120 path_sexcat[f] = self.get_sexcat_paths(f=f) 

121 # load catalog 

122 sexcat_dict[f] = at.load_hdf_cols(path_sexcat[f]) 

123 x_out_dict[f] = sexcat_dict[f][par.matching_x] 

124 y_out_dict[f] = sexcat_dict[f][par.matching_y] 

125 mag_out_dict[f] = sexcat_dict[f][par.matching_mag] 

126 

127 catalog_name_short = path_sexcat["i"].rsplit(".", 1)[0] 

128 segimage_name = catalog_name_short + "_seg.h5" 

129 segimage = h5py.File(segimage_name, "r")["SEGMENTATION"][:] 

130 

131 indices_link = match( 

132 x_in, y_in, mag_in_dict, segimage, mag_out_dict, par.filters 

133 ) 

134 

135 # split matches into stars and galaxies 

136 galaxy_mask = (indices_link < self.ctx.numgalaxies) & ( 

137 indices_link != NO_MATCH_VAL 

138 ) 

139 star_mask = (indices_link >= self.ctx.numgalaxies) & ( 

140 indices_link != NO_MATCH_VAL 

141 ) 

142 galaxy_indices = indices_link[galaxy_mask] 

143 star_indices = indices_link[star_mask] - self.ctx.numgalaxies 

144 

145 def append_(obj, col): 

146 new_columns.append(f"{col}:{obj[col].dtype.str}") 

147 new_column_shapes.append(obj[col].shape[1:]) 

148 new_columns_names.append(col) 

149 

150 for f in par.filters: 

151 # add columns 

152 new_columns = [] 

153 new_column_shapes = [] 

154 new_columns_names = [] 

155 

156 for col in gal_dict[f].dtype.names: 

157 if col not in sexcat_dict[f].dtype.names: 157 ↛ 156line 157 didn't jump to line 156 because the condition on line 157 was always true

158 append_(gal_dict[f], col) 

159 

160 for col in star_dict[f].dtype.names: 

161 if ( 161 ↛ 165line 161 didn't jump to line 165

162 col not in sexcat_dict[f].dtype.names 

163 and col not in gal_dict[f].dtype.names 

164 ): 

165 append_(star_dict[f], col) 

166 

167 if "star_gal" not in sexcat_dict[f].dtype.names: 167 ↛ 172line 167 didn't jump to line 172 because the condition on line 167 was always true

168 new_columns.append(f"star_gal:{np.array([NO_MATCH_VAL]).dtype.str}") 

169 new_column_shapes.append(()) 

170 new_columns_names.append("star_gal") 

171 

172 sexcat_dict[f] = at.add_cols( 

173 sexcat_dict[f], new_columns, shapes=new_column_shapes, data=NO_MATCH_VAL 

174 ) 

175 

176 # add matched values 

177 if "star_gal" in new_columns_names: 177 ↛ 182line 177 didn't jump to line 182 because the condition on line 177 was always true

178 sexcat_dict[f]["star_gal"][galaxy_mask] = 0 

179 sexcat_dict[f]["star_gal"][star_mask] = 1 

180 new_columns_names.remove("star_gal") 

181 

182 for new_col in new_columns_names: 

183 if new_col in gal_dict[f].dtype.names: 183 ↛ 188line 183 didn't jump to line 188 because the condition on line 183 was always true

184 sexcat_dict[f][new_col][galaxy_mask] = gal_dict[f][new_col][ 

185 galaxy_indices 

186 ] 

187 

188 if new_col in star_dict[f].dtype.names: 188 ↛ 182line 188 didn't jump to line 182 because the condition on line 188 was always true

189 sexcat_dict[f][new_col][star_mask] = star_dict[f][new_col][ 

190 star_indices 

191 ] 

192 

193 # write extended catalog to disk 

194 at.save_hdf_cols(path_sexcat[f], sexcat_dict[f], compression=HDF5_COMPRESS) 

195 

196 def __str__(self): 

197 return "match catalog to input"