Coverage for src/ufig/psf_estimation/psf_estimation_coadd_cnn.py: 90%
36 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 19:08 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 19:08 +0000
1import h5py
2import numpy as np
3from cosmic_toolbox import arraytools as at
4from cosmic_toolbox import logger
6from ufig.psf_estimation import psf_utils
7from ufig.psf_estimation.tiled_regressor import (
8 TiledRobustPolynomialRegressor as Regressor,
9)
11LOGGER = logger.get_logger(__file__)
14def predict_psf(position_xy, position_weights, regressor, settings, n_per_chunk=1000):
15 position_xy_transformed = psf_utils.transform_forward(
16 position_xy, scale=settings["scale_pos"]
17 )
18 position_xy_transformed_weights = np.concatenate(
19 [position_xy_transformed, position_weights], axis=1
20 )
21 position_par_post = regressor.predict(
22 position_xy_transformed_weights, batch_size=n_per_chunk
23 )
24 position_par_post[:] = psf_utils.transform_inverse(
25 position_par_post, settings["scale_par"]
26 )
27 select_no_coverage = (position_weights.sum(axis=1) == 0) | np.any(
28 ~np.isfinite(position_par_post), axis=1
29 )
30 position_par_post[select_no_coverage] = 0
32 return position_par_post, select_no_coverage
35def predict_psf_with_file(position_xy, filepath_psfmodel, id_pointing="all"):
36 if position_xy.shape[1] != 2: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true
37 raise ValueError(
38 f"Invalid position_xy shape (should be n_obj x 2) {position_xy.shape}"
39 )
41 # Setup interpolator
42 with h5py.File(filepath_psfmodel, "r") as fh5:
43 par_names = at.set_loading_dtypes(fh5["par_names"][...])
44 # n_psf_dim = len(par_names)
45 pointings_maps = fh5["map_pointings"]
46 # n_pointings = pointings_maps.attrs['n_pointings']
47 position_weights = psf_utils.get_position_weights(
48 position_xy[:, 0], position_xy[:, 1], pointings_maps
49 )
50 poly_coeffs = fh5["arr_pointings_polycoeffs"][...]
51 unseen_pointings = fh5["unseen_pointings"][...]
52 settings = {
53 key: at.set_loading_dtypes(fh5["settings"][key][...])
54 for key in fh5["settings"]
55 }
56 # set_unseen_to_mean = bool(fh5['set_unseen_to_mean'][...])
57 settings.setdefault("polynomial_type", "standard")
58 LOGGER.debug("polynomial_type={}".format(settings["polynomial_type"]))
60 regressor = Regressor(
61 poly_order=settings["poly_order"],
62 ridge_alpha=settings["ridge_alpha"],
63 polynomial_type=settings["polynomial_type"],
64 poly_coefficients=poly_coeffs,
65 unseen_pointings=unseen_pointings,
66 )
68 if id_pointing == "all": 68 ↛ 85line 68 didn't jump to line 85 because the condition on line 68 was always true
69 LOGGER.info(
70 f"prediction for cnn models n_pos={position_xy.shape[0]} id_pointing=all"
71 )
73 position_par_post, select_no_coverage = predict_psf(
74 position_xy, position_weights, regressor, settings
75 )
77 position_par_post = np.core.records.fromarrays(
78 position_par_post.T, names=",".join(par_names)
79 )
80 psf_utils.postprocess_catalog(position_par_post)
81 n_exposures = psf_utils.position_weights_to_nexp(position_weights)
82 yield position_par_post, select_no_coverage, n_exposures
84 else:
85 raise NotImplementedError(
86 "This feature is not yet implemented due to the polynomial coefficients"
87 " covariances which are a tiny bit tricky but definitely doable"
88 )