Coverage for src/ufig/plugins/write_catalog.py: 85%

53 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-12 19:08 +0000

1# Copyright (C) 2019 ETH Zurich, Institute for Particle Physics and Astrophysics 

2 

3""" 

4Created on Aug 2021 

5author: Tomasz Kacprzak 

6""" 

7 

8import numpy as np 

9from cosmic_toolbox import arraytools as at 

10from cosmic_toolbox import file_utils, logger 

11from ivy.plugin.base_plugin import BasePlugin 

12 

13LOGGER = logger.get_logger(__file__) 

14 

15 

16def catalog_to_rec(catalog): 

17 # get dtype first 

18 dtype_list = [] 

19 for col_name in catalog.columns: 

20 col = getattr(catalog, col_name) 

21 n_obj = len(col) 

22 if len(col.shape) == 1: 22 ↛ 25line 22 didn't jump to line 25 because the condition on line 22 was always true

23 dtype_list += [(col_name, col.dtype)] 

24 else: 

25 dtype_list += [(col_name, col.dtype, col.shape[1])] 

26 

27 # create empty array 

28 rec = np.empty(n_obj, dtype=np.dtype(dtype_list)) 

29 

30 # copy columns to array 

31 for col_name in catalog.columns: 

32 col = getattr(catalog, col_name) 

33 if len(col.shape) == 1: 33 ↛ 35line 33 didn't jump to line 35 because the condition on line 33 was always true

34 rec[col_name] = col 

35 elif col.shape[1] == 1: 

36 rec[col_name] = col.ravel() 

37 else: 

38 rec[col_name] = col 

39 

40 return rec 

41 

42 

43class Plugin(BasePlugin): 

44 def __call__(self): 

45 par = self.ctx.parameters 

46 

47 # write catalogs 

48 if "galaxies" in self.ctx: 48 ↛ 84line 48 didn't jump to line 84 because the condition on line 48 was always true

49 f = self.ctx.current_filter 

50 filepath_out = par.galaxy_catalog_name_dict[f] 

51 

52 cat = catalog_to_rec(self.ctx.galaxies) 

53 # add absolute ellipticies 

54 try: 

55 cat = at.add_cols( 

56 cat, ["e_abs"], data=np.sqrt(cat["e1"] ** 2 + cat["e2"] ** 2) 

57 ) 

58 except (ValueError, KeyError) as e: 

59 LOGGER.warning(f"e_abs could not be calculated: {e}") 

60 # add noise levels 

61 try: 

62 cat = at.add_cols( 

63 cat, ["bkg_noise_amp"], data=np.ones(len(cat)) * par.bkg_noise_amp 

64 ) 

65 except AttributeError as e: 

66 LOGGER.warning(f"bkg_noise_amp could not be calculated: {e}") 

67 

68 try: 

69 if "ra" not in cat.dtype.names and "dec" not in cat.dtype.names: 69 ↛ 82line 69 didn't jump to line 82 because the condition on line 69 was always true

70 y = np.array(cat["y"], dtype=int) 

71 x = np.array(cat["x"], dtype=int) 

72 if hasattr(par.bkg_noise_std, "shape"): 

73 cat = at.add_cols( 

74 cat, ["bkg_noise_std"], data=par.bkg_noise_std[y, x] 

75 ) 

76 else: 

77 cat = at.add_cols( 

78 cat, ["bkg_noise_std"], data=par.bkg_noise_std 

79 ) 

80 except (ValueError, KeyError, AttributeError) as e: 

81 LOGGER.warning(f"bkg_noise_std could not be calculated: {e}") 

82 file_utils.write_to_hdf(filepath_out, cat) 

83 

84 if "stars" in self.ctx: 84 ↛ exitline 84 didn't return from function '__call__' because the condition on line 84 was always true

85 filepath_out = par.star_catalog_name_dict[f] 

86 cat = catalog_to_rec(self.ctx.stars) 

87 file_utils.write_to_hdf(filepath_out, cat) 

88 

89 def __str__(self): 

90 return "write ucat catalog to file"