Coverage for src/ufig/psf_estimation/tiled_regressor.py: 95%

105 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-12 19:08 +0000

1import numpy as np 

2from sklearn.base import BaseEstimator, RegressorMixin 

3from sklearn.preprocessing import PolynomialFeatures 

4 

5 

6class UnderdeterminedError(ValueError): 

7 """ 

8 Raised when trying to fit an underdetermined model. 

9 """ 

10 

11 

12def get_poly_weight_basis( 

13 position_xy_transformed, position_weights, poly_order, polynomial_type 

14): 

15 n_stars, n_pointings = position_xy_transformed.shape[0], position_weights.shape[1] 

16 

17 if polynomial_type == "standard": 

18 poly = PolynomialFeatures( 

19 degree=poly_order, interaction_only=False, include_bias=True 

20 ) 

21 position_xy_basis = poly.fit_transform(position_xy_transformed) 

22 

23 elif polynomial_type == "chebyshev": 

24 if position_xy_transformed.shape[1] == 1: 

25 position_xy_basis = np.polynomial.chebyshev.chebvander( 

26 x=position_xy_transformed[:, 0], deg=poly_order 

27 ) 

28 

29 elif position_xy_transformed.shape[1] == 2: 29 ↛ 37line 29 didn't jump to line 37 because the condition on line 29 was always true

30 position_xy_basis = np.polynomial.chebyshev.chebvander2d( 

31 x=position_xy_transformed[:, 0], 

32 y=position_xy_transformed[:, 1], 

33 deg=[poly_order, poly_order], 

34 ) 

35 

36 else: 

37 raise NotImplementedError( 

38 "Chebyshev interpolation is only implemented for 1- and 2-dim. input." 

39 ) 

40 

41 else: 

42 raise ValueError(f"unknown polynomial type {polynomial_type}") 

43 

44 n_basis = position_xy_basis.shape[1] 

45 n_basis_pointings = n_pointings * n_basis 

46 position_weights_basis = np.zeros([n_stars, n_basis_pointings]) 

47 

48 for ip in range(n_pointings): 

49 istart = ip * n_basis 

50 iend = istart + n_basis 

51 position_weights_basis[:, istart:iend] = ( 

52 position_weights[:, ip][:, np.newaxis] * position_xy_basis 

53 ) 

54 

55 return position_weights_basis 

56 

57 

58def var_to_weight(v): 

59 if v is None: 

60 return None 

61 else: 

62 w = 1.0 / (v**2) 

63 w[~np.isfinite(w)] = 0 

64 w[w < 0] = 0 

65 return w 

66 

67 

68class TiledRobustPolynomialRegressor(BaseEstimator, RegressorMixin): 

69 def __init__( 

70 self, 

71 poly_order=3, 

72 ridge_alpha=0, 

73 n_input_dim=2, 

74 polynomial_type="standard", 

75 poly_coefficients=None, 

76 set_unseen_to_mean=False, 

77 unseen_pointings=None, 

78 raise_underdetermined=False, 

79 ): 

80 self.poly_order = poly_order 

81 self.n_input_dim = n_input_dim 

82 self.ridge_alpha = ridge_alpha 

83 self.polynomial_type = polynomial_type 

84 self.arr_pointings_polycoeffs = poly_coefficients 

85 self.unseen_pointings = unseen_pointings 

86 self.set_unseen_to_mean = set_unseen_to_mean 

87 self.raise_underdetermined = raise_underdetermined 

88 

89 def fit(self, X, y, var_y=None, method="ridge"): 

90 # check that there are enough data points 

91 if ( 

92 len(y) < self.n_free_params(X.shape[1] - self.n_input_dim) 

93 and self.raise_underdetermined 

94 ): 

95 raise UnderdeterminedError 

96 

97 # check shape of ridge_alpha 

98 n_output = y.shape[1] 

99 try: 

100 assert len(self.ridge_alpha) == n_output 

101 except TypeError: 

102 self.ridge_alpha = np.full(n_output, self.ridge_alpha) 

103 

104 position_weights_basis = self._get_basis(X) 

105 n_basis_pointings = position_weights_basis.shape[1] 

106 self.arr_pointings_polycoeffs = np.empty((n_output, n_basis_pointings)) 

107 

108 if method == "ridge": 

109 from sklearn.linear_model import Ridge 

110 

111 reg = Ridge(alpha=self.ridge_alpha, fit_intercept=False) 

112 reg.fit(X=position_weights_basis, y=y) 

113 self.arr_pointings_polycoeffs = reg.coef_ 

114 

115 elif method == "huber": 115 ↛ 131line 115 didn't jump to line 131 because the condition on line 115 was always true

116 for io in range(n_output): 

117 from sklearn.linear_model import HuberRegressor 

118 

119 w = None if var_y is None else var_to_weight(var_y[:, io]) 

120 reg = HuberRegressor( 

121 epsilon=1.01, 

122 alpha=self.ridge_alpha[io], 

123 fit_intercept=False, 

124 max_iter=5000, 

125 tol=1e-3, 

126 ) 

127 reg.fit(X=position_weights_basis, y=y[:, io], sample_weight=w) 

128 self.arr_pointings_polycoeffs[io] = reg.coef_ 

129 

130 # set pointings that have not been seen during the fit 

131 self.unseen_pointings = self._get_unseen_pointings(X) 

132 

133 def predict(self, X, batch_size=1000): 

134 n_batches = int(np.ceil(X.shape[0] / batch_size)) 

135 y = np.full( 

136 [X.shape[0], len(self.arr_pointings_polycoeffs)], 

137 dtype=np.float32, 

138 fill_value=np.nan, 

139 ) 

140 

141 select_predict = self._get_select_predict(X) 

142 

143 for ic in range(n_batches): 

144 si = ic * batch_size 

145 ei = si + batch_size 

146 

147 # predict for selected projects 

148 if np.any(select_predict[si:ei]): 148 ↛ 143line 148 didn't jump to line 143 because the condition on line 148 was always true

149 y[si:ei] = self._predict_for_seen_pointings(X[si:ei]) 

150 

151 # set unseen pointings to averages if requested 

152 if self.set_unseen_to_mean: 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true

153 y[~select_predict] = np.mean(y[select_predict], axis=0) 

154 

155 return y 

156 

157 def _predict_for_seen_pointings(self, X): 

158 position_weights_basis = self._get_basis(X) 

159 

160 nx = X.shape[0] 

161 ny = len(self.arr_pointings_polycoeffs) 

162 y = np.zeros([nx, ny]) 

163 

164 # Evaluate interpolation 

165 for io in range(ny): 

166 y[:, io] = self._evaluate_basis(position_weights_basis, io) 

167 

168 return y 

169 

170 def _get_basis(self, X): 

171 position_xy_transformed = X[:, : self.n_input_dim] 

172 position_weights = X[:, self.n_input_dim :] 

173 position_weights_basis = get_poly_weight_basis( 

174 position_xy_transformed, 

175 position_weights, 

176 self.poly_order, 

177 self.polynomial_type, 

178 ) 

179 return position_weights_basis 

180 

181 def _get_unseen_pointings(self, X): 

182 position_weights = X[:, self.n_input_dim :] 

183 select_unseen = np.all(position_weights == 0, axis=0) 

184 empty_pointings = np.flatnonzero(select_unseen) 

185 return empty_pointings 

186 

187 def _get_select_predict(self, X): 

188 # select objects that are not only on pointings which have not been seen during 

189 # the fit, this also removes objects that are no pointing at all 

190 position_weights_bool = X[:, self.n_input_dim :].astype(bool) 

191 seen_only = np.ones_like(position_weights_bool) 

192 seen_only[:, self.unseen_pointings] = False 

193 select = np.any(position_weights_bool & seen_only, axis=1) 

194 return select 

195 

196 def _evaluate_basis(self, basis, i_dim): 

197 return np.dot(basis, self.arr_pointings_polycoeffs[i_dim]) 

198 

199 def n_free_params(self, n_pointings): 

200 return n_pointings * (self.poly_order + 1) ** 2