Coverage for src/ufig/psf_estimation/tiled_regressor.py: 95%
105 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 19:08 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 19:08 +0000
1import numpy as np
2from sklearn.base import BaseEstimator, RegressorMixin
3from sklearn.preprocessing import PolynomialFeatures
6class UnderdeterminedError(ValueError):
7 """
8 Raised when trying to fit an underdetermined model.
9 """
12def get_poly_weight_basis(
13 position_xy_transformed, position_weights, poly_order, polynomial_type
14):
15 n_stars, n_pointings = position_xy_transformed.shape[0], position_weights.shape[1]
17 if polynomial_type == "standard":
18 poly = PolynomialFeatures(
19 degree=poly_order, interaction_only=False, include_bias=True
20 )
21 position_xy_basis = poly.fit_transform(position_xy_transformed)
23 elif polynomial_type == "chebyshev":
24 if position_xy_transformed.shape[1] == 1:
25 position_xy_basis = np.polynomial.chebyshev.chebvander(
26 x=position_xy_transformed[:, 0], deg=poly_order
27 )
29 elif position_xy_transformed.shape[1] == 2: 29 ↛ 37line 29 didn't jump to line 37 because the condition on line 29 was always true
30 position_xy_basis = np.polynomial.chebyshev.chebvander2d(
31 x=position_xy_transformed[:, 0],
32 y=position_xy_transformed[:, 1],
33 deg=[poly_order, poly_order],
34 )
36 else:
37 raise NotImplementedError(
38 "Chebyshev interpolation is only implemented for 1- and 2-dim. input."
39 )
41 else:
42 raise ValueError(f"unknown polynomial type {polynomial_type}")
44 n_basis = position_xy_basis.shape[1]
45 n_basis_pointings = n_pointings * n_basis
46 position_weights_basis = np.zeros([n_stars, n_basis_pointings])
48 for ip in range(n_pointings):
49 istart = ip * n_basis
50 iend = istart + n_basis
51 position_weights_basis[:, istart:iend] = (
52 position_weights[:, ip][:, np.newaxis] * position_xy_basis
53 )
55 return position_weights_basis
58def var_to_weight(v):
59 if v is None:
60 return None
61 else:
62 w = 1.0 / (v**2)
63 w[~np.isfinite(w)] = 0
64 w[w < 0] = 0
65 return w
68class TiledRobustPolynomialRegressor(BaseEstimator, RegressorMixin):
69 def __init__(
70 self,
71 poly_order=3,
72 ridge_alpha=0,
73 n_input_dim=2,
74 polynomial_type="standard",
75 poly_coefficients=None,
76 set_unseen_to_mean=False,
77 unseen_pointings=None,
78 raise_underdetermined=False,
79 ):
80 self.poly_order = poly_order
81 self.n_input_dim = n_input_dim
82 self.ridge_alpha = ridge_alpha
83 self.polynomial_type = polynomial_type
84 self.arr_pointings_polycoeffs = poly_coefficients
85 self.unseen_pointings = unseen_pointings
86 self.set_unseen_to_mean = set_unseen_to_mean
87 self.raise_underdetermined = raise_underdetermined
89 def fit(self, X, y, var_y=None, method="ridge"):
90 # check that there are enough data points
91 if (
92 len(y) < self.n_free_params(X.shape[1] - self.n_input_dim)
93 and self.raise_underdetermined
94 ):
95 raise UnderdeterminedError
97 # check shape of ridge_alpha
98 n_output = y.shape[1]
99 try:
100 assert len(self.ridge_alpha) == n_output
101 except TypeError:
102 self.ridge_alpha = np.full(n_output, self.ridge_alpha)
104 position_weights_basis = self._get_basis(X)
105 n_basis_pointings = position_weights_basis.shape[1]
106 self.arr_pointings_polycoeffs = np.empty((n_output, n_basis_pointings))
108 if method == "ridge":
109 from sklearn.linear_model import Ridge
111 reg = Ridge(alpha=self.ridge_alpha, fit_intercept=False)
112 reg.fit(X=position_weights_basis, y=y)
113 self.arr_pointings_polycoeffs = reg.coef_
115 elif method == "huber": 115 ↛ 131line 115 didn't jump to line 131 because the condition on line 115 was always true
116 for io in range(n_output):
117 from sklearn.linear_model import HuberRegressor
119 w = None if var_y is None else var_to_weight(var_y[:, io])
120 reg = HuberRegressor(
121 epsilon=1.01,
122 alpha=self.ridge_alpha[io],
123 fit_intercept=False,
124 max_iter=5000,
125 tol=1e-3,
126 )
127 reg.fit(X=position_weights_basis, y=y[:, io], sample_weight=w)
128 self.arr_pointings_polycoeffs[io] = reg.coef_
130 # set pointings that have not been seen during the fit
131 self.unseen_pointings = self._get_unseen_pointings(X)
133 def predict(self, X, batch_size=1000):
134 n_batches = int(np.ceil(X.shape[0] / batch_size))
135 y = np.full(
136 [X.shape[0], len(self.arr_pointings_polycoeffs)],
137 dtype=np.float32,
138 fill_value=np.nan,
139 )
141 select_predict = self._get_select_predict(X)
143 for ic in range(n_batches):
144 si = ic * batch_size
145 ei = si + batch_size
147 # predict for selected projects
148 if np.any(select_predict[si:ei]): 148 ↛ 143line 148 didn't jump to line 143 because the condition on line 148 was always true
149 y[si:ei] = self._predict_for_seen_pointings(X[si:ei])
151 # set unseen pointings to averages if requested
152 if self.set_unseen_to_mean: 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true
153 y[~select_predict] = np.mean(y[select_predict], axis=0)
155 return y
157 def _predict_for_seen_pointings(self, X):
158 position_weights_basis = self._get_basis(X)
160 nx = X.shape[0]
161 ny = len(self.arr_pointings_polycoeffs)
162 y = np.zeros([nx, ny])
164 # Evaluate interpolation
165 for io in range(ny):
166 y[:, io] = self._evaluate_basis(position_weights_basis, io)
168 return y
170 def _get_basis(self, X):
171 position_xy_transformed = X[:, : self.n_input_dim]
172 position_weights = X[:, self.n_input_dim :]
173 position_weights_basis = get_poly_weight_basis(
174 position_xy_transformed,
175 position_weights,
176 self.poly_order,
177 self.polynomial_type,
178 )
179 return position_weights_basis
181 def _get_unseen_pointings(self, X):
182 position_weights = X[:, self.n_input_dim :]
183 select_unseen = np.all(position_weights == 0, axis=0)
184 empty_pointings = np.flatnonzero(select_unseen)
185 return empty_pointings
187 def _get_select_predict(self, X):
188 # select objects that are not only on pointings which have not been seen during
189 # the fit, this also removes objects that are no pointing at all
190 position_weights_bool = X[:, self.n_input_dim :].astype(bool)
191 seen_only = np.ones_like(position_weights_bool)
192 seen_only[:, self.unseen_pointings] = False
193 select = np.any(position_weights_bool & seen_only, axis=1)
194 return select
196 def _evaluate_basis(self, basis, i_dim):
197 return np.dot(basis, self.arr_pointings_polycoeffs[i_dim])
199 def n_free_params(self, n_pointings):
200 return n_pointings * (self.poly_order + 1) ** 2