Coverage for src/ufig/psf_estimation/psf_estimation_coadd_cnn.py: 90%

36 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-12 19:08 +0000

1import h5py 

2import numpy as np 

3from cosmic_toolbox import arraytools as at 

4from cosmic_toolbox import logger 

5 

6from ufig.psf_estimation import psf_utils 

7from ufig.psf_estimation.tiled_regressor import ( 

8 TiledRobustPolynomialRegressor as Regressor, 

9) 

10 

11LOGGER = logger.get_logger(__file__) 

12 

13 

14def predict_psf(position_xy, position_weights, regressor, settings, n_per_chunk=1000): 

15 position_xy_transformed = psf_utils.transform_forward( 

16 position_xy, scale=settings["scale_pos"] 

17 ) 

18 position_xy_transformed_weights = np.concatenate( 

19 [position_xy_transformed, position_weights], axis=1 

20 ) 

21 position_par_post = regressor.predict( 

22 position_xy_transformed_weights, batch_size=n_per_chunk 

23 ) 

24 position_par_post[:] = psf_utils.transform_inverse( 

25 position_par_post, settings["scale_par"] 

26 ) 

27 select_no_coverage = (position_weights.sum(axis=1) == 0) | np.any( 

28 ~np.isfinite(position_par_post), axis=1 

29 ) 

30 position_par_post[select_no_coverage] = 0 

31 

32 return position_par_post, select_no_coverage 

33 

34 

35def predict_psf_with_file(position_xy, filepath_psfmodel, id_pointing="all"): 

36 if position_xy.shape[1] != 2: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true

37 raise ValueError( 

38 f"Invalid position_xy shape (should be n_obj x 2) {position_xy.shape}" 

39 ) 

40 

41 # Setup interpolator 

42 with h5py.File(filepath_psfmodel, "r") as fh5: 

43 par_names = at.set_loading_dtypes(fh5["par_names"][...]) 

44 # n_psf_dim = len(par_names) 

45 pointings_maps = fh5["map_pointings"] 

46 # n_pointings = pointings_maps.attrs['n_pointings'] 

47 position_weights = psf_utils.get_position_weights( 

48 position_xy[:, 0], position_xy[:, 1], pointings_maps 

49 ) 

50 poly_coeffs = fh5["arr_pointings_polycoeffs"][...] 

51 unseen_pointings = fh5["unseen_pointings"][...] 

52 settings = { 

53 key: at.set_loading_dtypes(fh5["settings"][key][...]) 

54 for key in fh5["settings"] 

55 } 

56 # set_unseen_to_mean = bool(fh5['set_unseen_to_mean'][...]) 

57 settings.setdefault("polynomial_type", "standard") 

58 LOGGER.debug("polynomial_type={}".format(settings["polynomial_type"])) 

59 

60 regressor = Regressor( 

61 poly_order=settings["poly_order"], 

62 ridge_alpha=settings["ridge_alpha"], 

63 polynomial_type=settings["polynomial_type"], 

64 poly_coefficients=poly_coeffs, 

65 unseen_pointings=unseen_pointings, 

66 ) 

67 

68 if id_pointing == "all": 68 ↛ 85line 68 didn't jump to line 85 because the condition on line 68 was always true

69 LOGGER.info( 

70 f"prediction for cnn models n_pos={position_xy.shape[0]} id_pointing=all" 

71 ) 

72 

73 position_par_post, select_no_coverage = predict_psf( 

74 position_xy, position_weights, regressor, settings 

75 ) 

76 

77 position_par_post = np.core.records.fromarrays( 

78 position_par_post.T, names=",".join(par_names) 

79 ) 

80 psf_utils.postprocess_catalog(position_par_post) 

81 n_exposures = psf_utils.position_weights_to_nexp(position_weights) 

82 yield position_par_post, select_no_coverage, n_exposures 

83 

84 else: 

85 raise NotImplementedError( 

86 "This feature is not yet implemented due to the polynomial coefficients" 

87 " covariances which are a tiny bit tricky but definitely doable" 

88 )