Coverage for src/ufig/psf_estimation/psf_utils.py: 88%

35 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-12 19:08 +0000

1import numpy as np 

2 

3from ufig import mask_utils 

4 

5ERR_VAL = 999 

6 

7 

8def transform_forward(vec, scale): 

9 vec_transformed = (vec - scale[:, 0]) / scale[:, 1] 

10 return vec_transformed 

11 

12 

13def transform_inverse(vec_transformed, scale): 

14 vec = vec_transformed * scale[:, 1] + scale[:, 0] 

15 return vec 

16 

17 

18def position_weights_to_nexp(position_weights): 

19 n_exp = np.sum(position_weights > 0, axis=1).astype(np.uint16) 

20 return n_exp 

21 

22 

23def postprocess_catalog(cat): 

24 if "psf_flux_ratio_cnn" in cat.dtype.names: 24 ↛ 25line 24 didn't jump to line 25 because the condition on line 24 was never true

25 cat["psf_flux_ratio_cnn"] = np.clip( 

26 cat["psf_flux_ratio_cnn"], a_min=0.0, a_max=1.0 

27 ) 

28 

29 

30def get_position_weights(x, y, pointings_maps): 

31 size_y, size_x = pointings_maps.shape 

32 

33 x_noedge = x.astype(np.int32) 

34 y_noedge = y.astype(np.int32) 

35 x_noedge[x_noedge >= size_x] = size_x - 1 

36 y_noedge[y_noedge >= size_y] = size_y - 1 

37 x_noedge[x_noedge < 0] = 0 

38 y_noedge[y_noedge < 0] = 0 

39 

40 n_pointings = pointings_maps.attrs["n_pointings"] 

41 

42 n_bit = 64 

43 

44 position_weights = mask_utils.decimal_integer_to_binary( 

45 n_bit, pointings_maps["bit1"][y_noedge, x_noedge], dtype_out=np.float64 

46 ) 

47 

48 for n in range(2, 6): 48 ↛ 66line 48 didn't jump to line 66 because the loop on line 48 didn't complete

49 n_pointings -= 64 

50 if n_pointings > 0: 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true

51 position_weights = np.concatenate( 

52 ( 

53 position_weights, 

54 mask_utils.decimal_integer_to_binary( 

55 n_bit, 

56 pointings_maps[f"bit{str(n)}"][y_noedge, x_noedge], 

57 dtype_out=np.float64, 

58 ), 

59 ), 

60 axis=1, 

61 dtype=np.float64, 

62 ) 

63 else: 

64 break 

65 

66 norm = np.sum(np.array(position_weights), axis=1, keepdims=True) 

67 position_weights /= norm 

68 position_weights[norm[:, 0] == 0] = 0 

69 

70 return position_weights