Coverage for src/ufig/io_util.py: 90%

78 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-07 15:17 +0000

1# Copyright (C) 2015 ETH Zurich, Institute for Astronomy 

2 

3""" 

4Created on Nov 3, 2015 

5 

6author: jakeret 

7""" 

8 

9import os 

10 

11import h5py 

12import healpy as hp 

13import numpy as np 

14from astropy.io import fits 

15from pkg_resources import resource_filename 

16 

17DEFAULT_ROOT_PATH = "res/maps/" 

18 

19 

20def get_local_abs_path(path): 

21 if ("@" in path and ":/" in path) or os.path.isabs(path): 

22 abs_path = path 

23 else: 

24 parent = os.environ.get("SUBMIT_DIR", os.getcwd()) 

25 abs_path = os.path.join(parent, path) 

26 return abs_path 

27 

28 

29def get_abs_path( 

30 file_name, 

31 root_path=DEFAULT_ROOT_PATH, 

32 is_file=True, 

33 package_name="ufig", 

34): 

35 """ 

36 Resolves the absolute path for a file or a directory. 

37 

38 In case the input is treated as a file (is_file=True), the function tries the 

39 following steps: 

40 1) if file_name is already an absolute path, then just return it 

41 2) if root_path is an absolute path the path is simply concatenated 

42 3) checking in the ufig package structure 

43 4) using DarkSkySync 

44 

45 In case the input should be treated as a directory (is_file=False), the function 

46 tries the following steps: 

47 1) if file_name is already an absolute path, then just return it 

48 2) if root_path is an absolute path the path is simply concatenated 

49 3) checking in the ufig package structure 

50 

51 :returns path: local absolute path to the file or directory if possible 

52 """ 

53 

54 if os.path.isabs(file_name): 

55 if os.path.exists(file_name): 

56 return file_name 

57 else: 

58 raise OSError(f"Absolute file path not found: {file_name}") 

59 

60 if os.path.isabs(root_path): 

61 path = os.path.join(root_path, file_name) 

62 

63 else: 

64 resource_directory = resource_filename(package_name, root_path) 

65 path = os.path.join(resource_directory, file_name) 

66 

67 if os.path.exists(path): 

68 return path 

69 

70 if is_file: 70 ↛ 82line 70 didn't jump to line 82 because the condition on line 70 was always true

71 try: 

72 from darkskysync import DarkSkySync 

73 

74 dssync = DarkSkySync() 

75 path = dssync.load(root_path + file_name)[0] 

76 except Exception as errmsg: 

77 raise RuntimeError( 

78 f"DarkSkySync failed for path {root_path + file_name} \n {errmsg}" 

79 ) from None 

80 

81 else: 

82 raise ValueError( 

83 f"Unable to construct absolute, existing directory path from {file_name}" 

84 ) 

85 

86 return path 

87 

88 

89def load_from_hdf5(file_name, hdf5_keys, hdf5_path="", root_path=DEFAULT_ROOT_PATH): 

90 """ 

91 Load data stored in a HDF5-file. 

92 

93 :param file_name: Name of the file. 

94 :param hdf5_keys: Keys of arrays to be loaded. 

95 :param hdf5_path: Path within HDF5-file appended to all keys. 

96 :param root_path: Relative or absolute root path. 

97 :return: Loaded arrays. 

98 """ 

99 

100 if str(hdf5_keys) == hdf5_keys: 

101 hdf5_keys = [hdf5_keys] 

102 return_null_entry = True 

103 else: 

104 return_null_entry = False 

105 

106 hdf5_keys = [hdf5_path + hdf5_key for hdf5_key in hdf5_keys] 

107 

108 path = get_abs_path(file_name, root_path=root_path) 

109 

110 with h5py.File(path, mode="r") as hdf5_file: 

111 hdf5_data = [hdf5_file[hdf5_key][...] for hdf5_key in hdf5_keys] 

112 

113 if return_null_entry: 

114 hdf5_data = hdf5_data[0] 

115 

116 return hdf5_data 

117 

118 

119def load_image_chunks(file_name, ext=0, dtype=np.float64, n_pix_per_row=100): 

120 with fits.open(file_name, memmap=True) as hdul: 

121 img = np.empty(hdul[ext].shape, dtype=dtype) 

122 n_chunks = int(np.ceil(hdul[ext].shape[0] / float(n_pix_per_row))) 

123 for ci in range(n_chunks): 

124 si, ei = ci * n_pix_per_row, (ci + 1) * n_pix_per_row 

125 img[si:ei, :] = hdul[ext].data[si:ei, :] 

126 

127 return img 

128 

129 

130def load_image(file_name, size_x, size_y, root_path=DEFAULT_ROOT_PATH, ext=0, **kwargs): 

131 """ 

132 Loads an image from stored in a fits file 

133 

134 :param file_name: name of the file 

135 :param size_x: max x size 

136 :param size_y: max y size 

137 :param root_path: relative root path 

138 :param ext: fits extention 

139 """ 

140 path = get_abs_path(file_name, root_path) 

141 img = fits.getdata(filename=path, ext=ext, **kwargs) 

142 shape = img.shape 

143 

144 if shape[0] > size_y and shape[1] > size_x: 

145 img = img[:size_y, :size_x] 

146 elif shape[0] < size_y and shape[1] < size_x: 

147 raise ValueError("Loaded image is smaller than rendered image") 

148 

149 return img 

150 

151 

152def load_hpmap(file_name, root_path, ext=1): 

153 """ 

154 Loads a healpix map and returns it in the ring-scheme 

155 

156 :param file_name: name of the file 

157 :param root_path: relative root path 

158 :param ext: Extension of Healpix-maps (by default = 1) 

159 

160 """ 

161 path = get_abs_path(file_name, root_path) 

162 

163 header = fits.getheader(path, ext=ext) 

164 tfields = header["TFIELDS"] 

165 ttypes = [header["TTYPE" + str(i)] for i in range(1, tfields + 1)] 

166 

167 list_maps = fits.getdata(path, ext=ext) 

168 maps = [list_maps[ttype] for ttype in ttypes] 

169 

170 if header["ORDERING"] == "NESTED": 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true

171 nside = hp.get_nside(maps[0]) 

172 ring2nest_pixels = hp.ring2nest(nside, np.arange(12 * nside**2)) 

173 maps = [maps[i][ring2nest_pixels] for i in range(len(maps))] 

174 

175 if tfields == 1: 175 ↛ 178line 175 didn't jump to line 178 because the condition on line 175 was always true

176 return maps[0] 

177 else: 

178 return maps 

179 

180 

181def write_hpmap(maps, file_path, **kwargs): 

182 """ 

183 Writes Healpix maps ensuring a format that can be loaded easily with fits.getdata() 

184 and displayed properly with Aladin. 

185 

186 :param maps: 1 or 3 (given as a list) Healpix maps 

187 :param file_path: path where maps need to be stored 

188 """ 

189 

190 hp.write_map(file_path, maps, fits_IDL=False, nest=False, coord="C", **kwargs)