Coverage for src/ufig/io_util.py: 90%
78 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 15:17 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 15:17 +0000
1# Copyright (C) 2015 ETH Zurich, Institute for Astronomy
3"""
4Created on Nov 3, 2015
6author: jakeret
7"""
9import os
11import h5py
12import healpy as hp
13import numpy as np
14from astropy.io import fits
15from pkg_resources import resource_filename
17DEFAULT_ROOT_PATH = "res/maps/"
20def get_local_abs_path(path):
21 if ("@" in path and ":/" in path) or os.path.isabs(path):
22 abs_path = path
23 else:
24 parent = os.environ.get("SUBMIT_DIR", os.getcwd())
25 abs_path = os.path.join(parent, path)
26 return abs_path
29def get_abs_path(
30 file_name,
31 root_path=DEFAULT_ROOT_PATH,
32 is_file=True,
33 package_name="ufig",
34):
35 """
36 Resolves the absolute path for a file or a directory.
38 In case the input is treated as a file (is_file=True), the function tries the
39 following steps:
40 1) if file_name is already an absolute path, then just return it
41 2) if root_path is an absolute path the path is simply concatenated
42 3) checking in the ufig package structure
43 4) using DarkSkySync
45 In case the input should be treated as a directory (is_file=False), the function
46 tries the following steps:
47 1) if file_name is already an absolute path, then just return it
48 2) if root_path is an absolute path the path is simply concatenated
49 3) checking in the ufig package structure
51 :returns path: local absolute path to the file or directory if possible
52 """
54 if os.path.isabs(file_name):
55 if os.path.exists(file_name):
56 return file_name
57 else:
58 raise OSError(f"Absolute file path not found: {file_name}")
60 if os.path.isabs(root_path):
61 path = os.path.join(root_path, file_name)
63 else:
64 resource_directory = resource_filename(package_name, root_path)
65 path = os.path.join(resource_directory, file_name)
67 if os.path.exists(path):
68 return path
70 if is_file: 70 ↛ 82line 70 didn't jump to line 82 because the condition on line 70 was always true
71 try:
72 from darkskysync import DarkSkySync
74 dssync = DarkSkySync()
75 path = dssync.load(root_path + file_name)[0]
76 except Exception as errmsg:
77 raise RuntimeError(
78 f"DarkSkySync failed for path {root_path + file_name} \n {errmsg}"
79 ) from None
81 else:
82 raise ValueError(
83 f"Unable to construct absolute, existing directory path from {file_name}"
84 )
86 return path
89def load_from_hdf5(file_name, hdf5_keys, hdf5_path="", root_path=DEFAULT_ROOT_PATH):
90 """
91 Load data stored in a HDF5-file.
93 :param file_name: Name of the file.
94 :param hdf5_keys: Keys of arrays to be loaded.
95 :param hdf5_path: Path within HDF5-file appended to all keys.
96 :param root_path: Relative or absolute root path.
97 :return: Loaded arrays.
98 """
100 if str(hdf5_keys) == hdf5_keys:
101 hdf5_keys = [hdf5_keys]
102 return_null_entry = True
103 else:
104 return_null_entry = False
106 hdf5_keys = [hdf5_path + hdf5_key for hdf5_key in hdf5_keys]
108 path = get_abs_path(file_name, root_path=root_path)
110 with h5py.File(path, mode="r") as hdf5_file:
111 hdf5_data = [hdf5_file[hdf5_key][...] for hdf5_key in hdf5_keys]
113 if return_null_entry:
114 hdf5_data = hdf5_data[0]
116 return hdf5_data
119def load_image_chunks(file_name, ext=0, dtype=np.float64, n_pix_per_row=100):
120 with fits.open(file_name, memmap=True) as hdul:
121 img = np.empty(hdul[ext].shape, dtype=dtype)
122 n_chunks = int(np.ceil(hdul[ext].shape[0] / float(n_pix_per_row)))
123 for ci in range(n_chunks):
124 si, ei = ci * n_pix_per_row, (ci + 1) * n_pix_per_row
125 img[si:ei, :] = hdul[ext].data[si:ei, :]
127 return img
130def load_image(file_name, size_x, size_y, root_path=DEFAULT_ROOT_PATH, ext=0, **kwargs):
131 """
132 Loads an image from stored in a fits file
134 :param file_name: name of the file
135 :param size_x: max x size
136 :param size_y: max y size
137 :param root_path: relative root path
138 :param ext: fits extention
139 """
140 path = get_abs_path(file_name, root_path)
141 img = fits.getdata(filename=path, ext=ext, **kwargs)
142 shape = img.shape
144 if shape[0] > size_y and shape[1] > size_x:
145 img = img[:size_y, :size_x]
146 elif shape[0] < size_y and shape[1] < size_x:
147 raise ValueError("Loaded image is smaller than rendered image")
149 return img
152def load_hpmap(file_name, root_path, ext=1):
153 """
154 Loads a healpix map and returns it in the ring-scheme
156 :param file_name: name of the file
157 :param root_path: relative root path
158 :param ext: Extension of Healpix-maps (by default = 1)
160 """
161 path = get_abs_path(file_name, root_path)
163 header = fits.getheader(path, ext=ext)
164 tfields = header["TFIELDS"]
165 ttypes = [header["TTYPE" + str(i)] for i in range(1, tfields + 1)]
167 list_maps = fits.getdata(path, ext=ext)
168 maps = [list_maps[ttype] for ttype in ttypes]
170 if header["ORDERING"] == "NESTED": 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true
171 nside = hp.get_nside(maps[0])
172 ring2nest_pixels = hp.ring2nest(nside, np.arange(12 * nside**2))
173 maps = [maps[i][ring2nest_pixels] for i in range(len(maps))]
175 if tfields == 1: 175 ↛ 178line 175 didn't jump to line 178 because the condition on line 175 was always true
176 return maps[0]
177 else:
178 return maps
181def write_hpmap(maps, file_path, **kwargs):
182 """
183 Writes Healpix maps ensuring a format that can be loaded easily with fits.getdata()
184 and displayed properly with Aladin.
186 :param maps: 1 or 3 (given as a list) Healpix maps
187 :param file_path: path where maps need to be stored
188 """
190 hp.write_map(file_path, maps, fits_IDL=False, nest=False, coord="C", **kwargs)