Coverage for src/ufig/psf_estimation/tiled_regressor.py: 95%
116 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 15:17 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 15:17 +0000
1import numpy as np
2from sklearn.base import BaseEstimator, RegressorMixin
3from sklearn.preprocessing import PolynomialFeatures
6class UnderdeterminedError(ValueError):
7 """
8 Raised when trying to fit an underdetermined model.
9 """
12def get_poly_weight_basis(
13 position_xy_transformed, position_weights, poly_order, polynomial_type
14):
15 n_stars, n_pointings = position_xy_transformed.shape[0], position_weights.shape[1]
17 if polynomial_type == "standard":
18 # Ensure poly_order is an integer, handling numpy scalars
19 if isinstance(poly_order, np.ndarray):
20 if poly_order.ndim == 0: # 0-dimensional array (scalar) 20 ↛ 23line 20 didn't jump to line 23 because the condition on line 20 was always true
21 poly_order = int(poly_order.item())
22 else:
23 poly_order = int(poly_order[0])
24 elif isinstance(poly_order, (list, tuple)): 24 ↛ 25line 24 didn't jump to line 25 because the condition on line 24 was never true
25 poly_order = int(poly_order[0])
26 else:
27 poly_order = int(poly_order)
29 poly = PolynomialFeatures(
30 degree=poly_order, interaction_only=False, include_bias=True
31 )
32 position_xy_basis = poly.fit_transform(position_xy_transformed)
34 elif polynomial_type == "chebyshev":
35 if position_xy_transformed.shape[1] == 1:
36 position_xy_basis = np.polynomial.chebyshev.chebvander(
37 x=position_xy_transformed[:, 0], deg=poly_order
38 )
40 elif position_xy_transformed.shape[1] == 2: 40 ↛ 48line 40 didn't jump to line 48 because the condition on line 40 was always true
41 position_xy_basis = np.polynomial.chebyshev.chebvander2d(
42 x=position_xy_transformed[:, 0],
43 y=position_xy_transformed[:, 1],
44 deg=[poly_order, poly_order],
45 )
47 else:
48 raise NotImplementedError(
49 "Chebyshev interpolation is only implemented for 1- and 2-dim. input."
50 )
52 else:
53 raise ValueError(f"unknown polynomial type {polynomial_type}")
55 n_basis = position_xy_basis.shape[1]
56 n_basis_pointings = n_pointings * n_basis
57 position_weights_basis = np.zeros([n_stars, n_basis_pointings])
59 for ip in range(n_pointings):
60 istart = ip * n_basis
61 iend = istart + n_basis
62 position_weights_basis[:, istart:iend] = (
63 position_weights[:, ip][:, np.newaxis] * position_xy_basis
64 )
66 return position_weights_basis
69def var_to_weight(v):
70 if v is None:
71 return None
72 else:
73 w = 1.0 / (v**2)
74 w[~np.isfinite(w)] = 0
75 w[w < 0] = 0
76 return w
79class TiledRobustPolynomialRegressor(BaseEstimator, RegressorMixin):
80 def __init__(
81 self,
82 poly_order=3,
83 ridge_alpha=0,
84 n_input_dim=2,
85 polynomial_type="standard",
86 poly_coefficients=None,
87 set_unseen_to_mean=False,
88 unseen_pointings=None,
89 raise_underdetermined=False,
90 ):
91 self.poly_order = poly_order
92 self.n_input_dim = n_input_dim
93 self.ridge_alpha = ridge_alpha
94 self.polynomial_type = polynomial_type
95 self.arr_pointings_polycoeffs = poly_coefficients
96 self.unseen_pointings = unseen_pointings
97 self.set_unseen_to_mean = set_unseen_to_mean
98 self.raise_underdetermined = raise_underdetermined
100 def fit(self, X, y, var_y=None, method="ridge"):
101 # check that there are enough data points
102 if (
103 len(y) < self.n_free_params(X.shape[1] - self.n_input_dim)
104 and self.raise_underdetermined
105 ):
106 raise UnderdeterminedError
108 # check shape of ridge_alpha
109 n_output = y.shape[1]
110 try:
111 assert len(self.ridge_alpha) == n_output
112 # Convert list to numpy array for sklearn compatibility
113 self.ridge_alpha = np.array(self.ridge_alpha)
114 except TypeError:
115 self.ridge_alpha = np.full(n_output, self.ridge_alpha)
117 position_weights_basis = self._get_basis(X)
118 n_basis_pointings = position_weights_basis.shape[1]
119 self.arr_pointings_polycoeffs = np.empty((n_output, n_basis_pointings))
121 if method == "ridge":
122 from sklearn.linear_model import Ridge
124 reg = Ridge(alpha=self.ridge_alpha, fit_intercept=False)
125 reg.fit(X=position_weights_basis, y=y)
126 self.arr_pointings_polycoeffs = reg.coef_
128 elif method == "huber": 128 ↛ 144line 128 didn't jump to line 144 because the condition on line 128 was always true
129 for io in range(n_output):
130 from sklearn.linear_model import HuberRegressor
132 w = None if var_y is None else var_to_weight(var_y[:, io])
133 reg = HuberRegressor(
134 epsilon=1.01,
135 alpha=self.ridge_alpha[io],
136 fit_intercept=False,
137 max_iter=5000,
138 tol=1e-3,
139 )
140 reg.fit(X=position_weights_basis, y=y[:, io], sample_weight=w)
141 self.arr_pointings_polycoeffs[io] = reg.coef_
143 # set pointings that have not been seen during the fit
144 self.unseen_pointings = self._get_unseen_pointings(X)
146 def predict(self, X, batch_size=1000):
147 n_batches = int(np.ceil(X.shape[0] / batch_size))
148 y = np.full(
149 [X.shape[0], len(self.arr_pointings_polycoeffs)],
150 dtype=np.float32,
151 fill_value=np.nan,
152 )
154 select_predict = self._get_select_predict(X)
156 for ic in range(n_batches):
157 si = ic * batch_size
158 ei = si + batch_size
160 # Only predict for objects that should be predicted in this batch
161 batch_select = select_predict[si:ei]
162 if np.any(batch_select): 162 ↛ 156line 162 didn't jump to line 156 because the condition on line 162 was always true
163 # Get batch data
164 X_batch = X[si:ei]
166 # Predict for the full batch
167 y_batch = self._predict_for_seen_pointings(X_batch)
169 # Only keep predictions for objects that should be predicted
170 y[si:ei][batch_select] = y_batch[batch_select]
172 # set unseen pointings to averages if requested
173 if self.set_unseen_to_mean:
174 y[~select_predict] = np.mean(y[select_predict], axis=0)
176 return y
178 def _predict_for_seen_pointings(self, X):
179 position_weights_basis = self._get_basis(X)
181 nx = X.shape[0]
182 ny = len(self.arr_pointings_polycoeffs)
183 y = np.zeros([nx, ny])
185 # Evaluate interpolation
186 for io in range(ny):
187 y[:, io] = self._evaluate_basis(position_weights_basis, io)
189 return y
191 def _get_basis(self, X):
192 position_xy_transformed = X[:, : self.n_input_dim]
193 position_weights = X[:, self.n_input_dim :]
194 position_weights_basis = get_poly_weight_basis(
195 position_xy_transformed,
196 position_weights,
197 self.poly_order,
198 self.polynomial_type,
199 )
200 return position_weights_basis
202 def _get_unseen_pointings(self, X):
203 position_weights = X[:, self.n_input_dim :]
204 select_unseen = np.all(position_weights == 0, axis=0)
205 empty_pointings = np.flatnonzero(select_unseen)
206 return empty_pointings
208 def _get_select_predict(self, X):
209 # select objects that are not only on pointings which have not been seen during
210 # the fit, this also removes objects that are no pointing at all
211 position_weights_bool = X[:, self.n_input_dim :].astype(bool)
212 seen_only = np.ones_like(position_weights_bool)
213 seen_only[:, self.unseen_pointings] = False
214 select = np.any(position_weights_bool & seen_only, axis=1)
215 return select
217 def _evaluate_basis(self, basis, i_dim):
218 return np.dot(basis, self.arr_pointings_polycoeffs[i_dim])
220 def n_free_params(self, n_pointings):
221 return n_pointings * (self.poly_order + 1) ** 2