Coverage for src / cosmic_toolbox / arraytools.py: 83%
466 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-31 12:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-31 12:38 +0000
1"""
2Array utilities for working with numpy structured arrays and HDF5 files.
4Provides functions for:
5- Converting between arrays, recarrays, dicts, dataframes, and classes
6- Adding, removing, and manipulating columns in structured arrays
7- Reading and writing HDF5 files with various storage formats
8- Handling NaN/Inf values and dtype conversions
10"""
12import os
14import h5py
15import numpy as np
16import pandas as pd
17import six
19from cosmic_toolbox import file_utils, logger
21LOGGER = logger.get_logger(__file__)
24def view_fields(rec, names):
25 """
26 `rec` must be a numpy structured array.
27 `names` is the collection of field names to keep.
29 Returns a view of the array `a` (not a copy).
30 """
31 dt = rec.dtype
32 formats = [dt.fields[name][0] for name in names]
33 offsets = [dt.fields[name][1] for name in names]
34 itemsize = rec.dtype.itemsize
35 newdt = np.dtype(
36 dict(names=names, formats=formats, offsets=offsets, itemsize=itemsize)
37 )
38 rec_view = rec.view(newdt)
39 return rec_view
42def delete_cols(rec, col_names):
43 """
44 Delete columns from a numpy recarray.
46 :param rec: numpy recarray
47 :param col_names: list of names of the columns to delete
48 :return: numpy recarray
49 """
50 col_names_all = list(rec.dtype.names)
52 for col_name in col_names:
53 if col_name in col_names_all:
54 col_names_all.remove(col_name)
56 if not col_names_all:
57 # If no columns are left, return an empty recarray with an empty dtype
58 return np.array([], dtype=[])
60 return rec[col_names_all].copy()
63def delete_columns(rec, col_names):
64 """
65 Delete columns from a numpy recarray.
66 (alias for delete_cols for backwards compatibility)
68 :param rec: numpy recarray
69 :param col_names: list of names of the columns to delete
70 """
71 return delete_cols(rec, col_names)
74def add_cols(rec, names, shapes=None, data=0, dtype=None):
75 """
76 Add columns to a numpy recarray. By default, the new columns are
77 filled with zeros. If `data` is a numpy array, it is used to fill
78 the new columns. If each column should be filled with different data,
79 `data` should be a list of numpy arrays or an array of shape
80 (n_cols, n_rows).
82 :param rec: numpy recarray
83 :param names: list of names for the columns
84 :param shapes: list of shapes for the columns
85 :param data: data to fill the columns with
86 :param dtype: dtype of the columns
87 :return: numpy recarray
88 """
90 # check if new data should be sliced
91 slice_data = isinstance(data, np.ndarray) and data.ndim == 2
93 # create new recarray
94 names = [str(name) for name in names]
95 extra_dtype = get_dtype(names, shapes=shapes, main=dtype)
96 newdtype = np.dtype(rec.dtype.descr + extra_dtype.descr)
97 newrec = np.empty(rec.shape, dtype=newdtype)
99 # add data to new recarray
100 for field in rec.dtype.fields:
101 newrec[field] = rec[field]
102 for ni, name in enumerate(extra_dtype.fields):
103 if slice_data:
104 newrec[name] = data[ni]
105 else:
106 newrec[name] = np.array(data).astype(dtype)
108 return newrec
111def ensure_cols(rec, names, shapes=None, data=0):
112 """
113 Ensure columns exist in a recarray, adding them if missing.
115 :param rec: numpy recarray
116 :param names: list of column names to ensure
117 :param shapes: list of shapes for the columns
118 :param data: data to fill new columns with
119 :return: numpy recarray with ensured columns
120 """
121 # find columns to add
122 names = [str(name) for name in names]
123 new_names = []
124 for name in names:
125 if get_dtype([name]).names[0] not in rec.dtype.names:
126 new_names.append(name)
128 # exit if no new cols
129 if len(new_names) == 0:
130 return rec
132 # add new columns
133 newrec = add_cols(rec, new_names, shapes=shapes, data=data)
135 return newrec
138def arr2rec(arr, names):
139 """
140 Convert a numpy array to a numpy structured array.
142 :param arr: numpy array
143 :param names: list of names for the columns
144 :return: numpy structured array
145 """
146 arr = np.atleast_2d(arr)
147 n_arr, n_params = arr.shape
148 dtype = dict(formats=[], names=names)
149 for i in range(n_params):
150 dtype["formats"].append(arr[:, i].dtype)
151 rec = np.zeros(n_arr, dtype=dtype)
152 for i, name in enumerate(names):
153 rec[name] = arr[:, i]
154 return rec
157def rec2arr(rec, return_names=False):
158 """
159 Convert a numpy structured array to a numpy array.
161 :param rec: numpy structured array
162 :param return_names: if True, also return the names of the columns
163 :return: numpy array
165 Example
166 -------
167 >>> rec = np.array([(1, 4), (2, 4), (3, 4)],
168 dtype=[('a', '<i8'), ('b', '<i8')])
169 >>> arr = rec2arr(rec)
170 >>> arr
171 array([[1, 4], [2, 4], [3, 4]])
172 >>> arr, names = rec2arr(rec, return_names=True)
173 >>> arr
174 array([[1, 4], [2, 4], [3, 4]])
175 >>> names
176 ['a', 'b']
177 """
178 arr = np.array([rec[par] for par in rec.dtype.names]).T
179 if return_names:
180 return arr, rec.dtype.names
181 else:
182 return arr
185def dict2rec(d):
186 """
187 Convert a dictionary of arrays/lists/scalars to a numpy structured array.
189 :param d: Dictionary with arrays/lists/scalars as values
190 :return: numpy structured array
191 """
193 # Convert lists/values to numpy arrays
194 d = {
195 k: np.array(v) if hasattr(v, "__iter__") and not isinstance(v, str) else v
196 for k, v in d.items()
197 }
199 # Get first dimension size from array values, default to 1 for scalar-only dict
200 array_sizes = [v.shape[0] for v in d.values() if hasattr(v, "shape") and v.shape]
201 size = array_sizes[0] if array_sizes else 1
203 # Create dtype list
204 dtype_list = []
205 for key, val in d.items():
206 if hasattr(val, "dtype"):
207 if val.ndim > 1:
208 dtype_list.append((key, val.dtype, val.shape[1:]))
209 else:
210 dtype_list.append((key, val.dtype))
211 else:
212 # For scalar values
213 dtype_list.append((key, np.array(val).dtype))
215 # Create and fill structured array
216 rec = (
217 np.empty(1, dtype=dtype_list) if size == 1 else np.empty(size, dtype=dtype_list)
218 )
219 for key, val in d.items():
220 if hasattr(val, "shape") and val.shape:
221 rec[key] = val
222 else:
223 rec[key] = val
225 return rec
228def rec2dict(rec):
229 """
230 Convert a numpy structured array to a dictionary.
232 :param rec: numpy structured array
233 :return: dictionary
235 Example
236 -------
237 >>> rec = np.array([(1, 4), (2, 4), (3, 4)],
238 dtype=[('a', '<i8'), ('b', '<i8')])
239 >>> d = rec2dict(rec)
240 >>> d
241 {'a': array([1, 2, 3]), 'b': array([4, 4, 4])}
242 """
243 keys = rec.dtype.names
244 d = {}
245 for key in keys:
246 d[key] = rec[key]
247 return d
250def dict2class(d):
251 """
252 Convert a dictionary to a class.
254 :param d: dictionary
255 :return: class
257 Example
258 -------
259 >>> d = {'a': [1, 2, 3], 'b': 4}
260 >>> c = dict2class(d)
261 >>> c.a
262 [1, 2, 3]
263 >>> c.b
264 4
265 """
267 class C:
268 pass
270 c = C()
271 for key in d:
272 setattr(c, key, d[key])
273 return c
276def rec2class(rec):
277 """
278 Convert a numpy structured array to a class.
280 :param rec: numpy structured array
281 :return: class
282 """
283 return dict2class(rec2dict(rec))
286def class2dict(c):
287 """
288 Convert a class to a dictionary.
290 :param c: class
291 :return: dictionary
292 """
294 return vars(c)
297def class2rec(c):
298 """
299 Convert a class to a numpy structured array.
301 :param c: class
302 :return: numpy structured array
303 """
304 return dict2rec(class2dict(c))
307def pd2rec(df):
308 """
309 Convert a pandas dataframe to a numpy structured array.
311 :param df: pandas dataframe
312 :return: numpy structured array
313 """
314 return df.to_records(index=False)
317def rec2pd(rec):
318 """
319 Convert a numpy structured array to a pandas DataFrame.
321 Multi-dimensional columns are flattened with suffix _0, _1, etc.
323 :param rec: numpy structured array
324 :return: pandas DataFrame
325 """
326 data_dict = {}
327 for name in rec.dtype.names:
328 if rec.dtype[name].shape:
329 for i in range(rec.dtype[name].shape[0]):
330 data_dict[f"{name}_{i}"] = rec[name][:, i]
331 else:
332 # Otherwise, just add it as a regular column
333 data_dict[name] = rec[name]
334 df = pd.DataFrame(data_dict)
335 return df
338def get_nan_mask(rec):
339 """
340 Get a mask for rows with NaNs in a numpy structured array.
342 :param rec: numpy structured array
343 :return: numpy structured array
344 """
345 nan_mask = np.zeros(len(rec), dtype=bool)
346 for name in rec.dtype.names:
347 nan_mask |= np.isnan(rec[name])
348 return nan_mask
351def remove_nans(rec):
352 """
353 Remove rows with NaNs from a numpy structured array.
355 :param rec: numpy structured array
356 :return: numpy structured array
357 """
358 nan_mask = get_nan_mask(rec)
359 return rec[~nan_mask]
362def get_inf_mask(rec):
363 """
364 Get a mask for rows with infs in a numpy structured array.
366 :param rec: numpy structured array
367 :return: numpy structured array
368 """
369 inf_mask = np.zeros(len(rec), dtype=bool)
370 for name in rec.dtype.names:
371 inf_mask |= np.isinf(rec[name])
372 return inf_mask
375def remove_infs(rec):
376 """
377 Remove rows with infs from a numpy structured array.
379 :param rec: numpy structured array
380 :return: numpy structured array
381 """
382 inf_mask = get_inf_mask(rec)
383 return rec[~inf_mask]
386def get_finite_mask(rec):
387 """
388 Get a mask for finite rows (i.e., rows without NaNs or infs)
389 in a numpy structured array.
391 :param rec: numpy structured array
392 :return: numpy structured array
393 """
394 inf_mask = get_inf_mask(rec)
395 nan_mask = get_nan_mask(rec)
396 return ~(inf_mask | nan_mask)
399def select_finite(rec):
400 """
401 Remove rows with NaNs or infs from a numpy structured array.
403 :param rec: numpy structured array
404 :return: numpy structured array
405 """
406 return remove_infs(remove_nans(rec))
409def arr_to_rec(arr, dtype):
410 """
411 Convert a numpy array to a numpy structured array given its dtype.
413 :param arr: numpy array
414 :param dtype: dtype of the structured array
415 :return: numpy structured array
416 """
417 newrecarray = np.core.records.fromarrays(np.array(arr).transpose(), dtype=dtype)
418 return newrecarray
421def get_dtype_of_list(lst):
422 """
423 Get the dtype of all elements in a list (must be uniform).
425 :param lst: list of arrays
426 :return: numpy dtype
427 :raises AssertionError: if not all elements have the same dtype
428 """
429 # Convert the list to a NumPy array
430 arr = np.array(lst)
432 # Get the data type of the first element in the array
433 dtype = arr[0].dtype
434 if len(arr) == 1:
435 return dtype
437 # Check if the data type of every other element is the same as the first element
438 all_the_same = True
439 for element in arr[1:]:
440 if element.dtype != dtype:
441 all_the_same = False
443 assert all_the_same, "Not all entries of the list have the same dtype"
445 return dtype
448def new_array(n_rows, columns, ints=None, float_dtype=np.float64, int_dtype=np.int64):
449 """
450 Create a new structured array with specified columns.
452 :param n_rows: number of rows
453 :param columns: list of column names
454 :param ints: list of column names that should be integers
455 :param float_dtype: dtype for float columns
456 :param int_dtype: dtype for integer columns
457 :return: numpy structured array initialized with zeros
458 """
459 if ints is None:
460 ints = []
461 n_columns = len(columns)
462 formats = [None] * n_columns
463 for ic in range(n_columns):
464 if columns[ic] in ints:
465 formats[ic] = int_dtype
466 else:
467 formats[ic] = float_dtype
468 newrec = np.zeros(n_rows, dtype=np.dtype(list(zip(columns, formats))))
469 return newrec
472def get_dtype(columns, main="f8", shapes=None):
473 """
474 Create a numpy dtype from column names.
476 Column names can include dtype specification as 'name:dtype'.
478 :param columns: list of column names (optionally with ':dtype' suffix)
479 :param main: default dtype for columns without explicit dtype
480 :param shapes: list of shapes for each column
481 :return: numpy dtype
482 """
483 if main is None:
484 main = "f8"
485 elif isinstance(main, type):
486 main = main.__name__
487 list_name = []
488 list_dtype = []
490 if shapes is None:
491 shapes = [() for _ in columns]
493 for col in columns:
494 if ":" in col:
495 name, dtype = col.split(":")
496 else:
497 name, dtype = col, main
499 list_name.append(str(name))
500 list_dtype.append(str(dtype))
502 dtype = np.dtype(list(zip(list_name, list_dtype, shapes)))
504 return dtype
507def get_storing_dtypes(dtype_list):
508 """
509 Convert dtype list for HDF5 storage (unicode to bytes).
511 :param dtype_list: list of (name, dtype, ...) tuples
512 :return: list of storage-compatible dtype tuples
513 """
514 if six.PY2:
515 dtypes = [(str(dt[0]), str(dt[1])) + dt[2:] for dt in dtype_list]
517 else:
518 dtypes = [(dt[0], dt[1].replace("<U", "|S")) + dt[2:] for dt in dtype_list]
520 return dtypes
523def get_loading_dtypes(dtype_list):
524 """
525 Convert dtype list for loading from HDF5 (bytes to unicode).
527 :param dtype_list: list of (name, dtype, ...) tuples
528 :return: list of Python-compatible dtype tuples
529 """
530 if six.PY2:
531 dtypes = [(str(dt[0]), str(dt[1])) + dt[2:] for dt in dtype_list]
533 else:
534 dtypes = []
535 # handling for h5py>=2.10.0
536 for dt in dtype_list:
537 if isinstance(dt[1], tuple):
538 dtypes.append((dt[0], dt[1][0].replace("|S", "<U")) + dt[2:])
539 else:
540 dtypes.append((dt[0], dt[1].replace("|S", "<U")) + dt[2:])
542 return dtypes
545def set_storing_dtypes(arr):
546 """
547 Convert array dtypes for HDF5 storage (unicode strings to bytes).
549 :param arr: numpy array, string, list, or scalar
550 :return: storage-compatible version of input
551 """
552 if six.PY2:
553 return arr
555 # numpy array
556 elif hasattr(arr, "dtype"):
557 dtype = arr.dtype
559 # ndarray
560 if len(dtype) == 0:
561 dtypes_store = arr.dtype.str.replace("<U", "|S")
563 # recarray
564 else:
565 dtypes_store = get_storing_dtypes(arr.dtype.descr)
567 return arr.astype(dtypes_store)
569 # string
570 elif isinstance(arr, str):
571 return arr.encode("utf-8")
573 else:
574 # list or tuple
575 try:
576 arr_fixed = [None] * len(arr)
578 for i, x in enumerate(arr):
579 if isinstance(x, str):
580 arr_fixed[i] = x.encode("utf-8")
581 else:
582 arr_fixed[i] = x
584 return arr_fixed
586 # single number
587 except Exception:
588 return arr
591def set_loading_dtypes(arr):
592 """
593 Convert array dtypes after loading from HDF5 (bytes to unicode strings).
595 :param arr: numpy array, bytes, list, or scalar
596 :return: Python-compatible version of input
597 """
598 if six.PY2:
599 return arr
601 # numpy array
602 elif hasattr(arr, "dtype"):
603 dtype = arr.dtype
605 # ndarray
606 if len(dtype) == 0:
607 dtypes_load = arr.dtype.str.replace("|S", "<U").replace("|O", "<U")
609 # recarray
610 else:
611 dtypes_load = get_loading_dtypes(arr.dtype.descr)
613 return arr.astype(dtypes_load)
615 # bytes
616 elif isinstance(arr, bytes):
617 return arr.decode("utf-8")
619 else:
620 # list or tuple
621 try:
622 arr_fixed = [None] * len(arr)
624 for i, x in enumerate(arr):
625 if isinstance(x, bytes):
626 arr_fixed[i] = x.decode("utf-8")
627 else:
628 arr_fixed[i] = x
630 return arr_fixed
632 # single number
633 except Exception:
634 return arr
637def save_hdf(filename, arr, **kwargs):
638 """
639 Save a structured array to an HDF5 file as a single 'data' dataset.
641 :param filename: path to the HDF5 file
642 :param arr: numpy structured array to save
643 :param kwargs: additional arguments passed to h5py.create_dataset
644 """
645 f5 = h5py.File(name=filename, mode="a")
647 try:
648 f5.clear()
650 except Exception:
651 for datasetname in f5:
652 del f5[datasetname]
654 arr_store = set_storing_dtypes(arr)
655 f5.create_dataset(name="data", data=arr_store, **kwargs)
656 f5.close()
657 LOGGER.info(f"saved {filename}")
660def write_to_hdf(filename, arr, name="data", compression="lzf", shuffle=True, **kwargs):
661 """
662 Write a recarray to a hdf file.
664 :param filename: filename of the hdf file
665 :param arr: numpy recarray
666 :param name: name of the dataset
667 :param compression: compression method
668 :param shuffle: shuffle data before compression
669 :param kwargs: keyword arguments for h5py.File.create_dataset
670 """
671 with h5py.File(filename, "w") as fh5:
672 fh5.create_dataset(
673 name=name, data=arr, compression=compression, shuffle=shuffle, **kwargs
674 )
677def load_hdf(filename, first_row=-1, last_row=-1):
678 """
679 Load a structured array from an HDF5 file's 'data' dataset.
681 :param filename: path to the HDF5 file
682 :param first_row: first row to load (-1 for beginning)
683 :param last_row: last row to load (-1 for end)
684 :return: numpy structured array
685 """
686 f5 = h5py.File(name=filename, mode="r")
688 if (first_row > -1) & (last_row > -1) & (first_row < last_row):
689 if first_row < last_row:
690 data = np.array(f5["data"][first_row:last_row])
692 else:
693 raise Exception(
694 f"first_row ({first_row}) should be smaller "
695 f"than last_row ({last_row})"
696 )
697 else:
698 data = np.array(f5["data"])
700 f5.close()
702 safe_dtypes = get_loading_dtypes(data.dtype.descr)
703 data = data.astype(safe_dtypes, copy=False)
705 LOGGER.debug(f"loaded {filename}")
707 return data
710def save_hdf_cols(filename, arr, compression=None, resizable=False, suppress_log=False):
711 """
712 Save a structured array to HDF5 with each column as a separate dataset.
714 :param filename: path to the HDF5 file
715 :param arr: numpy structured array to save
716 :param compression: compression method (e.g., 'lzf', 'gzip') or dict
717 :param resizable: if True, create resizable datasets
718 :param suppress_log: if True, log at debug level instead of info
719 """
720 if compression is None:
721 kwargs = {}
722 else:
723 if isinstance(compression, dict):
724 kwargs = compression
725 else:
726 kwargs = {"compression": compression}
728 dtypes_store = get_storing_dtypes(arr.dtype.descr)
730 with h5py.File(filename, "w") as fh5:
731 for dt in dtypes_store:
732 col = dt[0]
733 dtype = dt[1]
734 extra_dims = dt[2:]
736 if col in fh5:
737 del fh5[col]
739 if resizable:
740 kwargs["maxshape"] = (None,) + extra_dims
742 fh5.create_dataset(
743 name=col, data=arr[col].astype(dtype, copy=False), **kwargs
744 )
746 log_message = f"saved hdf file {filename} with {len(arr)} rows"
747 if suppress_log:
748 LOGGER.debug(log_message)
749 else:
750 LOGGER.info(log_message)
753def load_hdf_cols_from_file(
754 filename,
755 columns="all",
756 first_row=0,
757 last_row=-1,
758 cols_to_add=(),
759 selectors=None,
760 verb=True,
761):
762 """
763 Loads all columns of an hdf file into one recarray.
765 :param filename: path to hdf file
766 :param columns: list of columns to load, "all" to load all columns
767 :param first_row: first row to load
768 :param last_row: last row to load
769 :param cols_to_add: list of columns to add to the recarray
770 :param selectors: dictionary of selection masks for columns
771 :param verb: if True, print information
772 :return: recarray
773 """
774 with h5py.File(filename, mode="r") as fh5:
775 # Infer columns to load
776 if columns == "all":
777 columns = list(fh5.keys())
779 # Infer size of data to load
780 if last_row is not None:
781 if last_row > 0:
782 size = last_row - first_row
783 else:
784 size = len(fh5[columns[0]]) + last_row
785 last_row = size
787 if size < 0 or last_row < first_row:
788 raise Exception(
789 f"Combination first_row={first_row}, "
790 f"last_row={last_row} invalid"
791 )
792 else:
793 size = len(fh5[columns[0]])
794 last_row = size
796 # Get boolean mask in case any selectors were specified
797 if selectors is not None and len(selectors) > 0:
798 select = np.ones(size, dtype=np.bool)
800 for cols in selectors:
801 select_fun = selectors[cols]
803 if isinstance(cols, str):
804 cols = (cols,)
806 select &= select_fun(*[fh5[c][first_row:last_row] for c in cols])
808 size = np.count_nonzero(select)
810 if verb:
811 LOGGER.info(
812 f"masking {select.size - size} / {select.size} rows "
813 f"while loading {filename}"
814 )
816 else:
817 select = np.s_[:size]
819 # Create array holding loaded data
820 dtype_list = [(col, fh5[col].dtype.str, fh5[col].shape[1:]) for col in columns]
821 dtypes_load = get_loading_dtypes(dtype_list)
822 dtypes_load += [(col, np.float32) for col in cols_to_add]
823 arr = np.empty(size, dtype=dtypes_load)
825 for col in columns:
826 arr[col] = fh5[col][first_row:last_row][select]
828 for col in cols_to_add:
829 arr[col] = 0.0
831 return arr
834def col_name_to_path(dirname, colname):
835 """
836 Get the HDF5 file path for a column stored in a directory.
838 :param dirname: directory path
839 :param colname: column name
840 :return: full path to the column's HDF5 file
841 """
842 return os.path.join(dirname, colname + ".h5")
845def get_hdf_col_names(path):
846 """
847 Get column names from an HDF5 file or directory of HDF5 files.
849 :param path: path to HDF5 file or directory
850 :return: list of column names
851 """
852 # file
853 if os.path.isfile(path):
854 with h5py.File(path, mode="r") as fh5:
855 columns = list(fh5.keys())
857 # directory
858 columns = [
859 name for name, ext in map(os.path.splitext, os.listdir(path)) if ext == ".h5"
860 ]
862 return columns
865def load_hdf_cols_from_directory(
866 dirname,
867 columns="all",
868 first_row=0,
869 last_row=-1,
870 copy_local=False,
871 dirname_parent=None,
872 allow_nonexisting=False,
873 cols_to_add=(),
874 selectors=None,
875 verb=True,
876 copy_editor=None,
877):
878 """
879 Load columns stored as individual HDF5 files in a directory into one recarray.
881 :param dirname: directory containing column HDF5 files
882 :param columns: list of columns to load, or "all" for all columns
883 :param first_row: first row to load
884 :param last_row: last row to load (-1 means last row minus 1)
885 :param copy_local: if True, copy files locally before loading
886 :param dirname_parent: parent directory to search for missing columns
887 :param allow_nonexisting: if True, don't fail if directory doesn't exist
888 :param cols_to_add: additional columns to add (initialized to 0)
889 :param selectors: dict of column-based selection functions
890 :param verb: if True, print progress information
891 :param copy_editor: function to modify paths before copying
892 :return: numpy structured array
893 """
894 if selectors is None:
895 selectors = dict()
897 if copy_editor is None:
898 copy_editor = lambda p: p # noqa
900 # Infer columns to load
901 columns_all = []
902 if not allow_nonexisting or os.path.exists(dirname):
903 columns_all += [
904 name
905 for name, ext in map(os.path.splitext, os.listdir(dirname))
906 if ext == ".h5"
907 ]
908 if dirname_parent is not None:
909 columns_all += [
910 name
911 for name, ext in map(os.path.splitext, os.listdir(dirname_parent))
912 if ext == ".h5"
913 ]
914 columns_all = list(set(columns_all))
916 columns = columns_all if columns == "all" else list(columns)
918 # Get filename
919 # (for all columns s.t. one can also use selectors
920 # using columns that are not loaded)
921 dict_filename_cols = {}
922 for col in set(columns_all + columns + list(selectors.keys())):
923 # get the filename from the right folder
924 filename_col = col_name_to_path(dirname, col)
926 if not os.path.isfile(filename_col) and dirname_parent is not None:
927 filename_col = col_name_to_path(dirname_parent, col)
929 dict_filename_cols[col] = filename_col
931 # Infer size of data to load
932 if last_row is not None:
933 if last_row > 0:
934 size = last_row - first_row
935 else:
936 with h5py.File(dict_filename_cols[columns[0]], mode="r") as fh5:
937 size = len(fh5[columns[0]]) + last_row
938 last_row = size
940 if size < 0 or last_row < first_row:
941 raise Exception(
942 f"Combination first_row={first_row}, "
943 f"last_row={last_row} invalid"
944 )
945 else:
946 with h5py.File(dict_filename_cols[columns[0]], mode="r") as fh5:
947 size = len(fh5[columns[0]])
948 last_row = size
950 # Get boolean mask in case any selectors were specified
951 columns_copied = {}
953 if len(selectors) > 0:
954 select = np.ones(size, dtype=np.bool)
956 for cols in selectors:
957 select_fun = selectors[cols]
959 if isinstance(cols, str):
960 cols = (cols,)
962 data_select = [None] * len(cols)
964 for ic, c in enumerate(cols):
965 filename_col = dict_filename_cols[c]
967 if copy_local:
968 if c not in columns_copied:
969 path_local = os.path.join(
970 os.getcwd(), os.path.basename(filename_col)
971 )
972 file_utils.robust_copy(
973 copy_editor(filename_col),
974 path_local,
975 use_copyfile=True,
976 )
977 path_load = path_local
978 columns_copied[c] = path_load
979 else:
980 path_load = columns_copied[c]
982 else:
983 path_load = filename_col
985 with h5py.File(path_load, mode="r") as fh5:
986 data_select[ic] = fh5[c][first_row:last_row]
988 select &= select_fun(*data_select)
990 size = np.count_nonzero(select)
991 del data_select
993 if verb:
994 LOGGER.info(
995 f"masking {select.size - size} / {select.size} rows "
996 f"while loading {dirname}"
997 )
999 else:
1000 select = np.s_[:size]
1002 # Create array holding loaded data
1003 dtypes_list = [None] * len(columns)
1005 for i, col in enumerate(columns):
1006 with h5py.File(dict_filename_cols[col], mode="r") as fh5:
1007 dtypes_list[i] = (col, fh5[col].dtype.str, fh5[col].shape[1:])
1009 dtypes_load = get_loading_dtypes(dtypes_list)
1010 dtypes_load += [(col, np.float32) for col in cols_to_add]
1011 arr = np.empty(size, dtype=dtypes_load)
1013 # Now copy files one by one and read data
1014 for col in columns:
1015 filename_col = dict_filename_cols[col]
1017 if copy_local:
1018 if col in columns_copied:
1019 path_local = columns_copied[col]
1021 else:
1022 path_local = os.path.join(os.getcwd(), os.path.basename(filename_col))
1023 file_utils.robust_copy(
1024 copy_editor(filename_col), path_local, use_copyfile=True
1025 )
1026 columns_copied[col] = path_local
1028 path_load = path_local
1030 else:
1031 path_load = filename_col
1033 with h5py.File(path_load, mode="r") as fh5:
1034 arr[col] = fh5[col][first_row:last_row][select]
1036 if copy_local:
1037 os.remove(path_load)
1038 del columns_copied[col]
1040 # Remove remaining columns (columns used for selecting only)
1041 for path_local in columns_copied.values():
1042 os.remove(path_local)
1044 # Set newly added columns to zero
1045 for col in cols_to_add:
1046 arr[col] = 0.0
1048 return arr
1051def load_hdf_cols(
1052 filename,
1053 columns="all",
1054 first_row=0,
1055 last_row=None,
1056 verb=True,
1057 copy_local=True,
1058 filename_parent=None,
1059 allow_nonexisting=False,
1060 cols_to_add=(),
1061 selectors=None,
1062 copy_editor=None,
1063):
1064 """
1065 Load columns from an HDF5 file or directory of HDF5 files.
1067 Automatically detects whether filename is a file or directory.
1069 :param filename: path to HDF5 file or directory
1070 :param columns: list of columns to load, or "all" for all columns
1071 :param first_row: first row to load
1072 :param last_row: last row to load (None for all rows)
1073 :param verb: if True, print progress information
1074 :param copy_local: if True, copy files locally before loading
1075 :param filename_parent: parent path to search for missing columns
1076 :param allow_nonexisting: if True, don't fail if path doesn't exist
1077 :param cols_to_add: additional columns to add (initialized to 0)
1078 :param selectors: dict of column-based selection functions
1079 :param copy_editor: function to modify paths before copying
1080 :return: numpy structured array
1081 """
1082 if last_row is not None and (last_row > 0) & (first_row >= last_row):
1083 raise Exception(
1084 f"first_row {first_row} should be smaller than last_row {last_row}"
1085 )
1087 if os.path.isfile(filename):
1088 arr = load_hdf_cols_from_file(
1089 filename,
1090 columns=columns,
1091 first_row=first_row,
1092 last_row=last_row,
1093 cols_to_add=cols_to_add,
1094 selectors=selectors,
1095 verb=verb,
1096 )
1098 else:
1099 arr = load_hdf_cols_from_directory(
1100 filename,
1101 columns=columns,
1102 first_row=first_row,
1103 last_row=last_row,
1104 copy_local=copy_local,
1105 dirname_parent=filename_parent,
1106 allow_nonexisting=allow_nonexisting,
1107 cols_to_add=cols_to_add,
1108 selectors=selectors,
1109 verb=verb,
1110 copy_editor=copy_editor,
1111 )
1113 if verb:
1114 LOGGER.info(f"loaded {filename} with n_rows={len(arr)}")
1116 return arr
1119def append_rows_to_h5dset(dset, array):
1120 """
1121 Append rows to an existing HDF5 dataset.
1123 :param dset: h5py dataset (must be resizable)
1124 :param array: numpy array to append
1125 """
1126 nr_dset, nr_array = dset.shape[0], array.shape[0]
1127 nr_new = nr_dset + nr_array
1128 dset.resize(nr_new, axis=0)
1129 dset[nr_dset:] = set_storing_dtypes(array)
1132def replace_hdf5_dataset(fobj, name, data, **kwargs):
1133 """
1134 Replace or create a dataset in an HDF5 file.
1136 :param fobj: h5py File object
1137 :param name: dataset name
1138 :param data: data to write
1139 :param kwargs: additional arguments for create_dataset
1140 """
1141 if name in fobj:
1142 del fobj[name]
1143 fobj.create_dataset(name=name, data=set_storing_dtypes(data), **kwargs)
1146def overwrite_hdf5_column(path, name, data, **kwargs):
1147 """
1148 Overwrite a column in an existing HDF5 file.
1150 :param path: path to HDF5 file or directory
1151 :param name: column/dataset name
1152 :param data: new data to write
1153 :param kwargs: additional arguments for create_dataset
1154 """
1155 path_col = col_name_to_path(path, name) if os.path.isdir(path) else path
1157 with h5py.File(path_col, mode="r+") as fh5:
1158 replace_hdf5_dataset(fh5, name, set_storing_dtypes(data), **kwargs)
1161def check_hdf_column(filename, column_name):
1162 """
1163 Check if a column exists in an HDF5 file or directory.
1165 :param filename: path to HDF5 file or directory
1166 :param column_name: name of the column to check
1167 :return: True if column exists, False otherwise
1168 """
1169 if os.path.isfile(filename):
1170 with h5py.File(filename, mode="r") as fh5:
1171 col_found = column_name in fh5
1173 else:
1174 filename_col = col_name_to_path(filename, column_name)
1175 col_found = os.path.isfile(filename_col)
1177 return col_found
1180def rec_float64_to_float32(cat):
1181 """
1182 Convert float64 columns in a structured array to float32.
1184 :param cat: numpy structured array
1185 :return: structured array with float32 instead of float64
1186 """
1187 list_new_dtype = []
1189 all_ok = True
1191 for i in range(len(cat.dtype)):
1192 if cat.dtype[i] == np.float64:
1193 list_new_dtype.append(np.float32)
1194 all_ok = False
1196 else:
1197 list_new_dtype.append(cat.dtype[i])
1199 if all_ok:
1200 return cat
1202 else:
1203 new_dtype = np.dtype(dict(formats=list_new_dtype, names=cat.dtype.names))
1204 cat_new = cat.astype(new_dtype)
1205 return cat_new
1208def save_dict_to_hdf5(filename, data_dict, kw_compress=None):
1209 """
1210 Save a nested dictionary to HDF5 with groups and datasets.
1212 :param filename: path to the HDF5 file
1213 :param data_dict: nested dict {group_name: {dataset_name: data}}
1214 :param kw_compress: compression kwargs (default: lzf with shuffle)
1215 """
1216 if kw_compress is None:
1217 kw_compress = {}
1218 kw_compress.setdefault("compression", "lzf")
1219 kw_compress.setdefault("shuffle", True)
1221 f = h5py.File(filename, "w")
1222 for grp_name in data_dict:
1223 grp = f.create_group(grp_name)
1224 for dset_name in data_dict[grp_name]:
1225 grp.create_dataset(
1226 dset_name, data=data_dict[grp_name][dset_name], **kw_compress
1227 )
1228 f.close()
1231def nanequal(a, b):
1232 """
1233 Element-wise equality comparison that treats NaN == NaN as True.
1235 :param a: first array
1236 :param b: second array
1237 :return: boolean array
1238 """
1239 return (np.isnan(b) & np.isnan(a)) | (a == b)
1242def append_hdf(filename, arr, compression=None, **kwargs):
1243 """
1244 Append structured array data to HDF5 file.
1245 Creates file if it doesn't exist, appends if it does.
1246 """
1247 # Check if file exists
1248 if not os.path.exists(filename):
1249 # Create new file with resizable dataset
1250 kwargs = {}
1251 if compression is not None:
1252 kwargs["compression"] = compression
1254 arr_store = set_storing_dtypes(arr)
1256 with h5py.File(filename, "w") as f5:
1257 f5.create_dataset(
1258 name="data", data=arr_store, maxshape=(None,), chunks=True, **kwargs
1259 )
1260 LOGGER.debug(f"created hdf file {filename} with {len(arr)} rows")
1261 else:
1262 # Append to existing file
1263 with h5py.File(filename, "a") as fh5:
1264 dset = fh5["data"]
1265 append_rows_to_h5dset(dset, arr)
1266 LOGGER.debug(f"appended to hdf file {filename} with {len(arr)} additional rows")