Coverage for src/ufig/psf_estimation/tiled_regressor.py: 95%

116 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-07 15:17 +0000

1import numpy as np 

2from sklearn.base import BaseEstimator, RegressorMixin 

3from sklearn.preprocessing import PolynomialFeatures 

4 

5 

6class UnderdeterminedError(ValueError): 

7 """ 

8 Raised when trying to fit an underdetermined model. 

9 """ 

10 

11 

12def get_poly_weight_basis( 

13 position_xy_transformed, position_weights, poly_order, polynomial_type 

14): 

15 n_stars, n_pointings = position_xy_transformed.shape[0], position_weights.shape[1] 

16 

17 if polynomial_type == "standard": 

18 # Ensure poly_order is an integer, handling numpy scalars 

19 if isinstance(poly_order, np.ndarray): 

20 if poly_order.ndim == 0: # 0-dimensional array (scalar) 20 ↛ 23line 20 didn't jump to line 23 because the condition on line 20 was always true

21 poly_order = int(poly_order.item()) 

22 else: 

23 poly_order = int(poly_order[0]) 

24 elif isinstance(poly_order, (list, tuple)): 24 ↛ 25line 24 didn't jump to line 25 because the condition on line 24 was never true

25 poly_order = int(poly_order[0]) 

26 else: 

27 poly_order = int(poly_order) 

28 

29 poly = PolynomialFeatures( 

30 degree=poly_order, interaction_only=False, include_bias=True 

31 ) 

32 position_xy_basis = poly.fit_transform(position_xy_transformed) 

33 

34 elif polynomial_type == "chebyshev": 

35 if position_xy_transformed.shape[1] == 1: 

36 position_xy_basis = np.polynomial.chebyshev.chebvander( 

37 x=position_xy_transformed[:, 0], deg=poly_order 

38 ) 

39 

40 elif position_xy_transformed.shape[1] == 2: 40 ↛ 48line 40 didn't jump to line 48 because the condition on line 40 was always true

41 position_xy_basis = np.polynomial.chebyshev.chebvander2d( 

42 x=position_xy_transformed[:, 0], 

43 y=position_xy_transformed[:, 1], 

44 deg=[poly_order, poly_order], 

45 ) 

46 

47 else: 

48 raise NotImplementedError( 

49 "Chebyshev interpolation is only implemented for 1- and 2-dim. input." 

50 ) 

51 

52 else: 

53 raise ValueError(f"unknown polynomial type {polynomial_type}") 

54 

55 n_basis = position_xy_basis.shape[1] 

56 n_basis_pointings = n_pointings * n_basis 

57 position_weights_basis = np.zeros([n_stars, n_basis_pointings]) 

58 

59 for ip in range(n_pointings): 

60 istart = ip * n_basis 

61 iend = istart + n_basis 

62 position_weights_basis[:, istart:iend] = ( 

63 position_weights[:, ip][:, np.newaxis] * position_xy_basis 

64 ) 

65 

66 return position_weights_basis 

67 

68 

69def var_to_weight(v): 

70 if v is None: 

71 return None 

72 else: 

73 w = 1.0 / (v**2) 

74 w[~np.isfinite(w)] = 0 

75 w[w < 0] = 0 

76 return w 

77 

78 

79class TiledRobustPolynomialRegressor(BaseEstimator, RegressorMixin): 

80 def __init__( 

81 self, 

82 poly_order=3, 

83 ridge_alpha=0, 

84 n_input_dim=2, 

85 polynomial_type="standard", 

86 poly_coefficients=None, 

87 set_unseen_to_mean=False, 

88 unseen_pointings=None, 

89 raise_underdetermined=False, 

90 ): 

91 self.poly_order = poly_order 

92 self.n_input_dim = n_input_dim 

93 self.ridge_alpha = ridge_alpha 

94 self.polynomial_type = polynomial_type 

95 self.arr_pointings_polycoeffs = poly_coefficients 

96 self.unseen_pointings = unseen_pointings 

97 self.set_unseen_to_mean = set_unseen_to_mean 

98 self.raise_underdetermined = raise_underdetermined 

99 

100 def fit(self, X, y, var_y=None, method="ridge"): 

101 # check that there are enough data points 

102 if ( 

103 len(y) < self.n_free_params(X.shape[1] - self.n_input_dim) 

104 and self.raise_underdetermined 

105 ): 

106 raise UnderdeterminedError 

107 

108 # check shape of ridge_alpha 

109 n_output = y.shape[1] 

110 try: 

111 assert len(self.ridge_alpha) == n_output 

112 # Convert list to numpy array for sklearn compatibility 

113 self.ridge_alpha = np.array(self.ridge_alpha) 

114 except TypeError: 

115 self.ridge_alpha = np.full(n_output, self.ridge_alpha) 

116 

117 position_weights_basis = self._get_basis(X) 

118 n_basis_pointings = position_weights_basis.shape[1] 

119 self.arr_pointings_polycoeffs = np.empty((n_output, n_basis_pointings)) 

120 

121 if method == "ridge": 

122 from sklearn.linear_model import Ridge 

123 

124 reg = Ridge(alpha=self.ridge_alpha, fit_intercept=False) 

125 reg.fit(X=position_weights_basis, y=y) 

126 self.arr_pointings_polycoeffs = reg.coef_ 

127 

128 elif method == "huber": 128 ↛ 144line 128 didn't jump to line 144 because the condition on line 128 was always true

129 for io in range(n_output): 

130 from sklearn.linear_model import HuberRegressor 

131 

132 w = None if var_y is None else var_to_weight(var_y[:, io]) 

133 reg = HuberRegressor( 

134 epsilon=1.01, 

135 alpha=self.ridge_alpha[io], 

136 fit_intercept=False, 

137 max_iter=5000, 

138 tol=1e-3, 

139 ) 

140 reg.fit(X=position_weights_basis, y=y[:, io], sample_weight=w) 

141 self.arr_pointings_polycoeffs[io] = reg.coef_ 

142 

143 # set pointings that have not been seen during the fit 

144 self.unseen_pointings = self._get_unseen_pointings(X) 

145 

146 def predict(self, X, batch_size=1000): 

147 n_batches = int(np.ceil(X.shape[0] / batch_size)) 

148 y = np.full( 

149 [X.shape[0], len(self.arr_pointings_polycoeffs)], 

150 dtype=np.float32, 

151 fill_value=np.nan, 

152 ) 

153 

154 select_predict = self._get_select_predict(X) 

155 

156 for ic in range(n_batches): 

157 si = ic * batch_size 

158 ei = si + batch_size 

159 

160 # Only predict for objects that should be predicted in this batch 

161 batch_select = select_predict[si:ei] 

162 if np.any(batch_select): 162 ↛ 156line 162 didn't jump to line 156 because the condition on line 162 was always true

163 # Get batch data 

164 X_batch = X[si:ei] 

165 

166 # Predict for the full batch 

167 y_batch = self._predict_for_seen_pointings(X_batch) 

168 

169 # Only keep predictions for objects that should be predicted 

170 y[si:ei][batch_select] = y_batch[batch_select] 

171 

172 # set unseen pointings to averages if requested 

173 if self.set_unseen_to_mean: 

174 y[~select_predict] = np.mean(y[select_predict], axis=0) 

175 

176 return y 

177 

178 def _predict_for_seen_pointings(self, X): 

179 position_weights_basis = self._get_basis(X) 

180 

181 nx = X.shape[0] 

182 ny = len(self.arr_pointings_polycoeffs) 

183 y = np.zeros([nx, ny]) 

184 

185 # Evaluate interpolation 

186 for io in range(ny): 

187 y[:, io] = self._evaluate_basis(position_weights_basis, io) 

188 

189 return y 

190 

191 def _get_basis(self, X): 

192 position_xy_transformed = X[:, : self.n_input_dim] 

193 position_weights = X[:, self.n_input_dim :] 

194 position_weights_basis = get_poly_weight_basis( 

195 position_xy_transformed, 

196 position_weights, 

197 self.poly_order, 

198 self.polynomial_type, 

199 ) 

200 return position_weights_basis 

201 

202 def _get_unseen_pointings(self, X): 

203 position_weights = X[:, self.n_input_dim :] 

204 select_unseen = np.all(position_weights == 0, axis=0) 

205 empty_pointings = np.flatnonzero(select_unseen) 

206 return empty_pointings 

207 

208 def _get_select_predict(self, X): 

209 # select objects that are not only on pointings which have not been seen during 

210 # the fit, this also removes objects that are no pointing at all 

211 position_weights_bool = X[:, self.n_input_dim :].astype(bool) 

212 seen_only = np.ones_like(position_weights_bool) 

213 seen_only[:, self.unseen_pointings] = False 

214 select = np.any(position_weights_bool & seen_only, axis=1) 

215 return select 

216 

217 def _evaluate_basis(self, basis, i_dim): 

218 return np.dot(basis, self.arr_pointings_polycoeffs[i_dim]) 

219 

220 def n_free_params(self, n_pointings): 

221 return n_pointings * (self.poly_order + 1) ** 2