Coverage for src/ufig/io_util.py: 90%

78 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-12 19:08 +0000

1# Copyright (C) 2015 ETH Zurich, Institute for Astronomy 

2 

3""" 

4Created on Nov 3, 2015 

5 

6author: jakeret 

7""" 

8 

9import os 

10 

11import h5py 

12import healpy as hp 

13import numpy as np 

14from astropy.io import fits 

15from pkg_resources import resource_filename 

16 

17DEFAULT_ROOT_PATH = "res/maps/" 

18 

19 

20def get_local_abs_path(path): 

21 if ("@" in path and ":/" in path) or os.path.isabs(path): 

22 abs_path = path 

23 else: 

24 parent = os.environ.get("SUBMIT_DIR", os.getcwd()) 

25 abs_path = os.path.join(parent, path) 

26 return abs_path 

27 

28 

29def get_abs_path( 

30 file_name, 

31 root_path=DEFAULT_ROOT_PATH, 

32 is_file=True, 

33 package_name="ufig", 

34): 

35 """ 

36 Resolves the absolute path for a file or a directory. 

37 

38 In case the input is treated as a file (is_file=True), the function tries the 

39 following steps: 

40 1) if file_name is already an absolute path, then just return it 

41 2) if root_path is an absolute path the path is simply concatenated 

42 3) checking in the ufig package structure 

43 4) using DarkSkySync 

44 

45 In case the input should be treated as a directory (is_file=False), the function 

46 tries the following steps: 

47 1) if file_name is already an absolute path, then just return it 

48 2) if root_path is an absolute path the path is simply concatenated 

49 3) checking in the ufig package structure 

50 

51 :returns path: local absolute path to the file or directory if possible 

52 """ 

53 

54 if os.path.isabs(file_name): 

55 if os.path.exists(file_name): 

56 return file_name 

57 else: 

58 raise OSError(f"Absolute file path not found: {file_name}") 

59 

60 if os.path.isabs(root_path): 

61 path = os.path.join(root_path, file_name) 

62 

63 else: 

64 resource_directory = resource_filename(package_name, root_path) 

65 path = os.path.join(resource_directory, file_name) 

66 

67 if os.path.exists(path): 

68 return path 

69 

70 if is_file: 70 ↛ 82line 70 didn't jump to line 82 because the condition on line 70 was always true

71 try: 

72 from darkskysync import DarkSkySync 

73 

74 dssync = DarkSkySync() 

75 path = dssync.load(root_path + file_name)[0] 

76 except Exception as errmsg: 

77 raise RuntimeError( 

78 f"DarkSkySync failed for path {root_path + file_name} \n {errmsg}" 

79 ) from None 

80 

81 else: 

82 raise ValueError( 

83 f"Unable to construct absolute, existing directory path from {file_name}" 

84 ) 

85 

86 return path 

87 

88 

89def load_from_hdf5(file_name, hdf5_keys, hdf5_path="", root_path=DEFAULT_ROOT_PATH): 

90 """ 

91 Load data stored in a HDF5-file. 

92 :param file_name: Name of the file. 

93 :param hdf5_keys: Keys of arrays to be loaded. 

94 :param hdf5_path: Path within HDF5-file appended to all keys. 

95 :param root_path: Relative or absolute root path. 

96 :return: Loaded arrays. 

97 """ 

98 

99 if str(hdf5_keys) == hdf5_keys: 

100 hdf5_keys = [hdf5_keys] 

101 return_null_entry = True 

102 else: 

103 return_null_entry = False 

104 

105 hdf5_keys = [hdf5_path + hdf5_key for hdf5_key in hdf5_keys] 

106 

107 path = get_abs_path(file_name, root_path=root_path) 

108 

109 with h5py.File(path, mode="r") as hdf5_file: 

110 hdf5_data = [hdf5_file[hdf5_key][...] for hdf5_key in hdf5_keys] 

111 

112 if return_null_entry: 

113 hdf5_data = hdf5_data[0] 

114 

115 return hdf5_data 

116 

117 

118def load_image_chunks(file_name, ext=0, dtype=np.float64, n_pix_per_row=100): 

119 with fits.open(file_name, memmap=True) as hdul: 

120 img = np.empty(hdul[ext].shape, dtype=dtype) 

121 n_chunks = int(np.ceil(hdul[ext].shape[0] / float(n_pix_per_row))) 

122 for ci in range(n_chunks): 

123 si, ei = ci * n_pix_per_row, (ci + 1) * n_pix_per_row 

124 img[si:ei, :] = hdul[ext].data[si:ei, :] 

125 

126 return img 

127 

128 

129def load_image(file_name, size_x, size_y, root_path=DEFAULT_ROOT_PATH, ext=0, **kwargs): 

130 """ 

131 Loads an image from stored in a fits file 

132 :param file_name: name of the file 

133 :param size_x: max x size 

134 :param size_y: max y size 

135 :param root_path: relative root path 

136 :param ext: fits extention 

137 """ 

138 path = get_abs_path(file_name, root_path) 

139 img = fits.getdata(filename=path, ext=ext, **kwargs) 

140 shape = img.shape 

141 

142 if shape[0] > size_y and shape[1] > size_x: 

143 img = img[:size_y, :size_x] 

144 elif shape[0] < size_y and shape[1] < size_x: 

145 raise ValueError("Loaded image is smaller than rendered image") 

146 

147 return img 

148 

149 

150def load_hpmap(file_name, root_path, ext=1): 

151 """ 

152 Loads a healpix map and returns it in the ring-scheme 

153 

154 :param file_name: name of the file 

155 :param root_path: relative root path 

156 :param ext: Extension of Healpix-maps (by default = 1) 

157 

158 """ 

159 path = get_abs_path(file_name, root_path) 

160 

161 header = fits.getheader(path, ext=ext) 

162 tfields = header["TFIELDS"] 

163 ttypes = [header["TTYPE" + str(i)] for i in range(1, tfields + 1)] 

164 

165 list_maps = fits.getdata(path, ext=ext) 

166 maps = [list_maps[ttype] for ttype in ttypes] 

167 

168 if header["ORDERING"] == "NESTED": 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true

169 nside = hp.get_nside(maps[0]) 

170 ring2nest_pixels = hp.ring2nest(nside, np.arange(12 * nside**2)) 

171 maps = [maps[i][ring2nest_pixels] for i in range(len(maps))] 

172 

173 if tfields == 1: 173 ↛ 176line 173 didn't jump to line 176 because the condition on line 173 was always true

174 return maps[0] 

175 else: 

176 return maps 

177 

178 

179def write_hpmap(maps, file_path, **kwargs): 

180 """ 

181 Writes Healpix maps ensuring a format that can be loaded easily with fits.getdata() 

182 and displayed properly with Aladin. 

183 

184 :param maps: 1 or 3 (given as a list) Healpix maps 

185 :param file_path: path where maps need to be stored 

186 """ 

187 

188 hp.write_map(file_path, maps, fits_IDL=False, nest=False, coord="C", **kwargs)