Coverage for src/ufig/plugins/add_generic_stamp_flags_ucat.py: 99%

67 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-12 19:08 +0000

1# Copyright (c) 2016 ETH Zurich, Institute of Astronomy, Tomasz Kacprzak 

2# <tomasz.kacprzak@phys.ethz.ch> 

3""" 

4Created on Aug 5, 2016 

5@author: Tomasz Kacprzak 

6adapted by Silvan Fischbacher, 2024 

7""" 

8 

9import h5py 

10import numpy as np 

11from cosmic_toolbox import arraytools as at 

12from cosmic_toolbox import logger 

13from ivy.plugin.base_plugin import BasePlugin 

14 

15from ufig import mask_utils 

16from ufig.array_util import set_flag_bit 

17from ufig.plugins.add_generic_stamp_flags import ( 

18 FLAGBIT_EDGE_DUPLICATE, 

19 FLAGBIT_NEARBY_BRIGHT_STAR, 

20 FLAGBIT_SURVEY_MASK, 

21 FLAGBIT_SYSMAP_DELTA_WEIGHT, 

22) 

23 

24LOGGER = logger.get_logger(__file__) 

25 

26 

27class Plugin(BasePlugin): 

28 def __call__(self): 

29 par = self.ctx.parameters 

30 

31 if hasattr(par, "det_clf_catalog_name"): 31 ↛ exitline 31 didn't return from function '__call__' because the condition on line 31 was always true

32 add_all_stamp_flags( 

33 par.det_clf_catalog_name_dict[self.ctx.current_filter], 

34 par.filepath_overlapblock, 

35 par.sextractor_mask_name_dict[self.ctx.current_filter], 

36 3 * par.sextractor_catalog_off_mask_radius, 

37 ) 

38 

39 def __str__(self): 

40 return "tile overlap/coadd bounadry col for ucat catalog" 

41 

42 

43def add_all_stamp_flags( 

44 filename_cat, 

45 filename_overlapblock, 

46 filename_mask, 

47 off_mask_radius, 

48): 

49 add_edge_duplicate_flag(filename_cat, filename_overlapblock) 

50 add_pixel_based_masks(filename_cat, filename_mask, off_mask_radius) 

51 

52 

53def save_flags_stamp(filename_cat, flags_stamp): 

54 with h5py.File(filename_cat, "a") as f: 

55 cat = f["data"] 

56 del f["data"] 

57 if "FLAGS_STAMP" in cat.dtype.names: 

58 cat["FLAGS_STAMP"] = flags_stamp 

59 else: 

60 cat = at.add_cols( 

61 cat, ["FLAGS_STAMP"], data=flags_stamp, dtype=flags_stamp.dtype 

62 ) 

63 f.create_dataset(name="data", data=cat, compression="lzf", shuffle=True) 

64 LOGGER.info(f"Saved flags to {filename_cat}") 

65 

66 

67def load_flags_stamp(filename_cat, len_cat): 

68 with h5py.File(filename_cat, "r") as f: 

69 cat = f["data"] 

70 if "FLAGS_STAMP" in cat.dtype.names: 

71 flags_stamp = cat["FLAGS_STAMP"] 

72 else: 

73 flags_stamp = np.zeros(len_cat, dtype=np.uint32) 

74 return flags_stamp 

75 

76 

77def add_pixel_based_masks(filename_cat, filename_mask, off_mask_radius): 

78 with h5py.File(filename_cat, "r") as f: 

79 cat = np.array(f["data"]) 

80 flags_stamp = load_flags_stamp(filename_cat, len(cat)) 

81 

82 # Pixel mask 

83 with h5py.File(filename_mask, mode="r") as fh5: 

84 pixel_mask = fh5["mask"][...] 

85 

86 FLAGBITS = {"delta_weight": 0, "star": 1, "mask": 2} 

87 n_bits = len(FLAGBITS) 

88 

89 # Get masked pixels 

90 pixel_mask_sysmap_delta_weight = mask_utils.set_masked_pixels( 

91 pixel_mask, [FLAGBITS["delta_weight"]], n_bits=n_bits 

92 ) 

93 pixel_mask_nearby_star = mask_utils.set_masked_pixels( 

94 pixel_mask, [FLAGBITS["star"]], n_bits=n_bits 

95 ) 

96 pixel_mask_survey_mask = mask_utils.set_masked_pixels( 

97 pixel_mask, [FLAGBITS["mask"]], n_bits=n_bits 

98 ) 

99 

100 # Transform to catalog mask 

101 # TODO: implement r50_offset as parameter 

102 select_sysmap_delta_weight = ~mask_utils.pixel_mask_to_ucat_catalog_mask( 

103 pixel_mask_sysmap_delta_weight, cat, off_mask_radius, r50_offset=5 

104 ) 

105 select_mask_nearby_star = ~mask_utils.pixel_mask_to_ucat_catalog_mask( 

106 pixel_mask_nearby_star, cat, off_mask_radius, r50_offset=5 

107 ) 

108 select_survey_mask = ~mask_utils.pixel_mask_to_ucat_catalog_mask( 

109 pixel_mask_survey_mask, cat, off_mask_radius, r50_offset=5 

110 ) 

111 

112 # Set FLAGS_STAMP 

113 set_flag_bit( 

114 flags=flags_stamp, 

115 select=select_sysmap_delta_weight, 

116 field=FLAGBIT_SYSMAP_DELTA_WEIGHT, 

117 ) 

118 set_flag_bit( 

119 flags=flags_stamp, 

120 select=select_mask_nearby_star, 

121 field=FLAGBIT_NEARBY_BRIGHT_STAR, 

122 ) 

123 set_flag_bit( 

124 flags=flags_stamp, select=select_survey_mask, field=FLAGBIT_SURVEY_MASK 

125 ) 

126 

127 save_flags_stamp(filename_cat, flags_stamp) 

128 

129 

130def add_edge_duplicate_flag(filename_cat, filename_overlapblock): 

131 with h5py.File(filename_cat, "r") as f: 

132 x = np.int32(np.array(f["data"])["x"]) 

133 y = np.int32(np.array(f["data"])["y"]) 

134 

135 with h5py.File(filename_overlapblock, "r") as f: 

136 img_mask = np.array(f["img_mask"], dtype=np.int8) 

137 

138 x[x < 0] = 0 

139 y[y < 0] = 0 

140 x[x >= img_mask.shape[1]] = img_mask.shape[1] 

141 y[y >= img_mask.shape[0]] = img_mask.shape[0] 

142 

143 is_duplicate = img_mask[y, x] == 1 

144 

145 flags_stamp = load_flags_stamp(filename_cat, len(is_duplicate)) 

146 set_flag_bit(flags=flags_stamp, select=is_duplicate, field=FLAGBIT_EDGE_DUPLICATE) 

147 

148 save_flags_stamp(filename_cat, flags_stamp)