Coverage for src/ufig/io_util.py: 90%
78 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 19:08 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 19:08 +0000
1# Copyright (C) 2015 ETH Zurich, Institute for Astronomy
3"""
4Created on Nov 3, 2015
6author: jakeret
7"""
9import os
11import h5py
12import healpy as hp
13import numpy as np
14from astropy.io import fits
15from pkg_resources import resource_filename
17DEFAULT_ROOT_PATH = "res/maps/"
20def get_local_abs_path(path):
21 if ("@" in path and ":/" in path) or os.path.isabs(path):
22 abs_path = path
23 else:
24 parent = os.environ.get("SUBMIT_DIR", os.getcwd())
25 abs_path = os.path.join(parent, path)
26 return abs_path
29def get_abs_path(
30 file_name,
31 root_path=DEFAULT_ROOT_PATH,
32 is_file=True,
33 package_name="ufig",
34):
35 """
36 Resolves the absolute path for a file or a directory.
38 In case the input is treated as a file (is_file=True), the function tries the
39 following steps:
40 1) if file_name is already an absolute path, then just return it
41 2) if root_path is an absolute path the path is simply concatenated
42 3) checking in the ufig package structure
43 4) using DarkSkySync
45 In case the input should be treated as a directory (is_file=False), the function
46 tries the following steps:
47 1) if file_name is already an absolute path, then just return it
48 2) if root_path is an absolute path the path is simply concatenated
49 3) checking in the ufig package structure
51 :returns path: local absolute path to the file or directory if possible
52 """
54 if os.path.isabs(file_name):
55 if os.path.exists(file_name):
56 return file_name
57 else:
58 raise OSError(f"Absolute file path not found: {file_name}")
60 if os.path.isabs(root_path):
61 path = os.path.join(root_path, file_name)
63 else:
64 resource_directory = resource_filename(package_name, root_path)
65 path = os.path.join(resource_directory, file_name)
67 if os.path.exists(path):
68 return path
70 if is_file: 70 ↛ 82line 70 didn't jump to line 82 because the condition on line 70 was always true
71 try:
72 from darkskysync import DarkSkySync
74 dssync = DarkSkySync()
75 path = dssync.load(root_path + file_name)[0]
76 except Exception as errmsg:
77 raise RuntimeError(
78 f"DarkSkySync failed for path {root_path + file_name} \n {errmsg}"
79 ) from None
81 else:
82 raise ValueError(
83 f"Unable to construct absolute, existing directory path from {file_name}"
84 )
86 return path
89def load_from_hdf5(file_name, hdf5_keys, hdf5_path="", root_path=DEFAULT_ROOT_PATH):
90 """
91 Load data stored in a HDF5-file.
92 :param file_name: Name of the file.
93 :param hdf5_keys: Keys of arrays to be loaded.
94 :param hdf5_path: Path within HDF5-file appended to all keys.
95 :param root_path: Relative or absolute root path.
96 :return: Loaded arrays.
97 """
99 if str(hdf5_keys) == hdf5_keys:
100 hdf5_keys = [hdf5_keys]
101 return_null_entry = True
102 else:
103 return_null_entry = False
105 hdf5_keys = [hdf5_path + hdf5_key for hdf5_key in hdf5_keys]
107 path = get_abs_path(file_name, root_path=root_path)
109 with h5py.File(path, mode="r") as hdf5_file:
110 hdf5_data = [hdf5_file[hdf5_key][...] for hdf5_key in hdf5_keys]
112 if return_null_entry:
113 hdf5_data = hdf5_data[0]
115 return hdf5_data
118def load_image_chunks(file_name, ext=0, dtype=np.float64, n_pix_per_row=100):
119 with fits.open(file_name, memmap=True) as hdul:
120 img = np.empty(hdul[ext].shape, dtype=dtype)
121 n_chunks = int(np.ceil(hdul[ext].shape[0] / float(n_pix_per_row)))
122 for ci in range(n_chunks):
123 si, ei = ci * n_pix_per_row, (ci + 1) * n_pix_per_row
124 img[si:ei, :] = hdul[ext].data[si:ei, :]
126 return img
129def load_image(file_name, size_x, size_y, root_path=DEFAULT_ROOT_PATH, ext=0, **kwargs):
130 """
131 Loads an image from stored in a fits file
132 :param file_name: name of the file
133 :param size_x: max x size
134 :param size_y: max y size
135 :param root_path: relative root path
136 :param ext: fits extention
137 """
138 path = get_abs_path(file_name, root_path)
139 img = fits.getdata(filename=path, ext=ext, **kwargs)
140 shape = img.shape
142 if shape[0] > size_y and shape[1] > size_x:
143 img = img[:size_y, :size_x]
144 elif shape[0] < size_y and shape[1] < size_x:
145 raise ValueError("Loaded image is smaller than rendered image")
147 return img
150def load_hpmap(file_name, root_path, ext=1):
151 """
152 Loads a healpix map and returns it in the ring-scheme
154 :param file_name: name of the file
155 :param root_path: relative root path
156 :param ext: Extension of Healpix-maps (by default = 1)
158 """
159 path = get_abs_path(file_name, root_path)
161 header = fits.getheader(path, ext=ext)
162 tfields = header["TFIELDS"]
163 ttypes = [header["TTYPE" + str(i)] for i in range(1, tfields + 1)]
165 list_maps = fits.getdata(path, ext=ext)
166 maps = [list_maps[ttype] for ttype in ttypes]
168 if header["ORDERING"] == "NESTED": 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true
169 nside = hp.get_nside(maps[0])
170 ring2nest_pixels = hp.ring2nest(nside, np.arange(12 * nside**2))
171 maps = [maps[i][ring2nest_pixels] for i in range(len(maps))]
173 if tfields == 1: 173 ↛ 176line 173 didn't jump to line 176 because the condition on line 173 was always true
174 return maps[0]
175 else:
176 return maps
179def write_hpmap(maps, file_path, **kwargs):
180 """
181 Writes Healpix maps ensuring a format that can be loaded easily with fits.getdata()
182 and displayed properly with Aladin.
184 :param maps: 1 or 3 (given as a list) Healpix maps
185 :param file_path: path where maps need to be stored
186 """
188 hp.write_map(file_path, maps, fits_IDL=False, nest=False, coord="C", **kwargs)