Coverage for src / cosmic_toolbox / arraytools.py: 83%

466 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-31 12:38 +0000

1""" 

2Array utilities for working with numpy structured arrays and HDF5 files. 

3 

4Provides functions for: 

5- Converting between arrays, recarrays, dicts, dataframes, and classes 

6- Adding, removing, and manipulating columns in structured arrays 

7- Reading and writing HDF5 files with various storage formats 

8- Handling NaN/Inf values and dtype conversions 

9 

10""" 

11 

12import os 

13 

14import h5py 

15import numpy as np 

16import pandas as pd 

17import six 

18 

19from cosmic_toolbox import file_utils, logger 

20 

21LOGGER = logger.get_logger(__file__) 

22 

23 

24def view_fields(rec, names): 

25 """ 

26 `rec` must be a numpy structured array. 

27 `names` is the collection of field names to keep. 

28 

29 Returns a view of the array `a` (not a copy). 

30 """ 

31 dt = rec.dtype 

32 formats = [dt.fields[name][0] for name in names] 

33 offsets = [dt.fields[name][1] for name in names] 

34 itemsize = rec.dtype.itemsize 

35 newdt = np.dtype( 

36 dict(names=names, formats=formats, offsets=offsets, itemsize=itemsize) 

37 ) 

38 rec_view = rec.view(newdt) 

39 return rec_view 

40 

41 

42def delete_cols(rec, col_names): 

43 """ 

44 Delete columns from a numpy recarray. 

45 

46 :param rec: numpy recarray 

47 :param col_names: list of names of the columns to delete 

48 :return: numpy recarray 

49 """ 

50 col_names_all = list(rec.dtype.names) 

51 

52 for col_name in col_names: 

53 if col_name in col_names_all: 

54 col_names_all.remove(col_name) 

55 

56 if not col_names_all: 

57 # If no columns are left, return an empty recarray with an empty dtype 

58 return np.array([], dtype=[]) 

59 

60 return rec[col_names_all].copy() 

61 

62 

63def delete_columns(rec, col_names): 

64 """ 

65 Delete columns from a numpy recarray. 

66 (alias for delete_cols for backwards compatibility) 

67 

68 :param rec: numpy recarray 

69 :param col_names: list of names of the columns to delete 

70 """ 

71 return delete_cols(rec, col_names) 

72 

73 

74def add_cols(rec, names, shapes=None, data=0, dtype=None): 

75 """ 

76 Add columns to a numpy recarray. By default, the new columns are 

77 filled with zeros. If `data` is a numpy array, it is used to fill 

78 the new columns. If each column should be filled with different data, 

79 `data` should be a list of numpy arrays or an array of shape 

80 (n_cols, n_rows). 

81 

82 :param rec: numpy recarray 

83 :param names: list of names for the columns 

84 :param shapes: list of shapes for the columns 

85 :param data: data to fill the columns with 

86 :param dtype: dtype of the columns 

87 :return: numpy recarray 

88 """ 

89 

90 # check if new data should be sliced 

91 slice_data = isinstance(data, np.ndarray) and data.ndim == 2 

92 

93 # create new recarray 

94 names = [str(name) for name in names] 

95 extra_dtype = get_dtype(names, shapes=shapes, main=dtype) 

96 newdtype = np.dtype(rec.dtype.descr + extra_dtype.descr) 

97 newrec = np.empty(rec.shape, dtype=newdtype) 

98 

99 # add data to new recarray 

100 for field in rec.dtype.fields: 

101 newrec[field] = rec[field] 

102 for ni, name in enumerate(extra_dtype.fields): 

103 if slice_data: 

104 newrec[name] = data[ni] 

105 else: 

106 newrec[name] = np.array(data).astype(dtype) 

107 

108 return newrec 

109 

110 

111def ensure_cols(rec, names, shapes=None, data=0): 

112 """ 

113 Ensure columns exist in a recarray, adding them if missing. 

114 

115 :param rec: numpy recarray 

116 :param names: list of column names to ensure 

117 :param shapes: list of shapes for the columns 

118 :param data: data to fill new columns with 

119 :return: numpy recarray with ensured columns 

120 """ 

121 # find columns to add 

122 names = [str(name) for name in names] 

123 new_names = [] 

124 for name in names: 

125 if get_dtype([name]).names[0] not in rec.dtype.names: 

126 new_names.append(name) 

127 

128 # exit if no new cols 

129 if len(new_names) == 0: 

130 return rec 

131 

132 # add new columns 

133 newrec = add_cols(rec, new_names, shapes=shapes, data=data) 

134 

135 return newrec 

136 

137 

138def arr2rec(arr, names): 

139 """ 

140 Convert a numpy array to a numpy structured array. 

141 

142 :param arr: numpy array 

143 :param names: list of names for the columns 

144 :return: numpy structured array 

145 """ 

146 arr = np.atleast_2d(arr) 

147 n_arr, n_params = arr.shape 

148 dtype = dict(formats=[], names=names) 

149 for i in range(n_params): 

150 dtype["formats"].append(arr[:, i].dtype) 

151 rec = np.zeros(n_arr, dtype=dtype) 

152 for i, name in enumerate(names): 

153 rec[name] = arr[:, i] 

154 return rec 

155 

156 

157def rec2arr(rec, return_names=False): 

158 """ 

159 Convert a numpy structured array to a numpy array. 

160 

161 :param rec: numpy structured array 

162 :param return_names: if True, also return the names of the columns 

163 :return: numpy array 

164 

165 Example 

166 ------- 

167 >>> rec = np.array([(1, 4), (2, 4), (3, 4)], 

168 dtype=[('a', '<i8'), ('b', '<i8')]) 

169 >>> arr = rec2arr(rec) 

170 >>> arr 

171 array([[1, 4], [2, 4], [3, 4]]) 

172 >>> arr, names = rec2arr(rec, return_names=True) 

173 >>> arr 

174 array([[1, 4], [2, 4], [3, 4]]) 

175 >>> names 

176 ['a', 'b'] 

177 """ 

178 arr = np.array([rec[par] for par in rec.dtype.names]).T 

179 if return_names: 

180 return arr, rec.dtype.names 

181 else: 

182 return arr 

183 

184 

185def dict2rec(d): 

186 """ 

187 Convert a dictionary of arrays/lists/scalars to a numpy structured array. 

188 

189 :param d: Dictionary with arrays/lists/scalars as values 

190 :return: numpy structured array 

191 """ 

192 

193 # Convert lists/values to numpy arrays 

194 d = { 

195 k: np.array(v) if hasattr(v, "__iter__") and not isinstance(v, str) else v 

196 for k, v in d.items() 

197 } 

198 

199 # Get first dimension size from array values, default to 1 for scalar-only dict 

200 array_sizes = [v.shape[0] for v in d.values() if hasattr(v, "shape") and v.shape] 

201 size = array_sizes[0] if array_sizes else 1 

202 

203 # Create dtype list 

204 dtype_list = [] 

205 for key, val in d.items(): 

206 if hasattr(val, "dtype"): 

207 if val.ndim > 1: 

208 dtype_list.append((key, val.dtype, val.shape[1:])) 

209 else: 

210 dtype_list.append((key, val.dtype)) 

211 else: 

212 # For scalar values 

213 dtype_list.append((key, np.array(val).dtype)) 

214 

215 # Create and fill structured array 

216 rec = ( 

217 np.empty(1, dtype=dtype_list) if size == 1 else np.empty(size, dtype=dtype_list) 

218 ) 

219 for key, val in d.items(): 

220 if hasattr(val, "shape") and val.shape: 

221 rec[key] = val 

222 else: 

223 rec[key] = val 

224 

225 return rec 

226 

227 

228def rec2dict(rec): 

229 """ 

230 Convert a numpy structured array to a dictionary. 

231 

232 :param rec: numpy structured array 

233 :return: dictionary 

234 

235 Example 

236 ------- 

237 >>> rec = np.array([(1, 4), (2, 4), (3, 4)], 

238 dtype=[('a', '<i8'), ('b', '<i8')]) 

239 >>> d = rec2dict(rec) 

240 >>> d 

241 {'a': array([1, 2, 3]), 'b': array([4, 4, 4])} 

242 """ 

243 keys = rec.dtype.names 

244 d = {} 

245 for key in keys: 

246 d[key] = rec[key] 

247 return d 

248 

249 

250def dict2class(d): 

251 """ 

252 Convert a dictionary to a class. 

253 

254 :param d: dictionary 

255 :return: class 

256 

257 Example 

258 ------- 

259 >>> d = {'a': [1, 2, 3], 'b': 4} 

260 >>> c = dict2class(d) 

261 >>> c.a 

262 [1, 2, 3] 

263 >>> c.b 

264 4 

265 """ 

266 

267 class C: 

268 pass 

269 

270 c = C() 

271 for key in d: 

272 setattr(c, key, d[key]) 

273 return c 

274 

275 

276def rec2class(rec): 

277 """ 

278 Convert a numpy structured array to a class. 

279 

280 :param rec: numpy structured array 

281 :return: class 

282 """ 

283 return dict2class(rec2dict(rec)) 

284 

285 

286def class2dict(c): 

287 """ 

288 Convert a class to a dictionary. 

289 

290 :param c: class 

291 :return: dictionary 

292 """ 

293 

294 return vars(c) 

295 

296 

297def class2rec(c): 

298 """ 

299 Convert a class to a numpy structured array. 

300 

301 :param c: class 

302 :return: numpy structured array 

303 """ 

304 return dict2rec(class2dict(c)) 

305 

306 

307def pd2rec(df): 

308 """ 

309 Convert a pandas dataframe to a numpy structured array. 

310 

311 :param df: pandas dataframe 

312 :return: numpy structured array 

313 """ 

314 return df.to_records(index=False) 

315 

316 

317def rec2pd(rec): 

318 """ 

319 Convert a numpy structured array to a pandas DataFrame. 

320 

321 Multi-dimensional columns are flattened with suffix _0, _1, etc. 

322 

323 :param rec: numpy structured array 

324 :return: pandas DataFrame 

325 """ 

326 data_dict = {} 

327 for name in rec.dtype.names: 

328 if rec.dtype[name].shape: 

329 for i in range(rec.dtype[name].shape[0]): 

330 data_dict[f"{name}_{i}"] = rec[name][:, i] 

331 else: 

332 # Otherwise, just add it as a regular column 

333 data_dict[name] = rec[name] 

334 df = pd.DataFrame(data_dict) 

335 return df 

336 

337 

338def get_nan_mask(rec): 

339 """ 

340 Get a mask for rows with NaNs in a numpy structured array. 

341 

342 :param rec: numpy structured array 

343 :return: numpy structured array 

344 """ 

345 nan_mask = np.zeros(len(rec), dtype=bool) 

346 for name in rec.dtype.names: 

347 nan_mask |= np.isnan(rec[name]) 

348 return nan_mask 

349 

350 

351def remove_nans(rec): 

352 """ 

353 Remove rows with NaNs from a numpy structured array. 

354 

355 :param rec: numpy structured array 

356 :return: numpy structured array 

357 """ 

358 nan_mask = get_nan_mask(rec) 

359 return rec[~nan_mask] 

360 

361 

362def get_inf_mask(rec): 

363 """ 

364 Get a mask for rows with infs in a numpy structured array. 

365 

366 :param rec: numpy structured array 

367 :return: numpy structured array 

368 """ 

369 inf_mask = np.zeros(len(rec), dtype=bool) 

370 for name in rec.dtype.names: 

371 inf_mask |= np.isinf(rec[name]) 

372 return inf_mask 

373 

374 

375def remove_infs(rec): 

376 """ 

377 Remove rows with infs from a numpy structured array. 

378 

379 :param rec: numpy structured array 

380 :return: numpy structured array 

381 """ 

382 inf_mask = get_inf_mask(rec) 

383 return rec[~inf_mask] 

384 

385 

386def get_finite_mask(rec): 

387 """ 

388 Get a mask for finite rows (i.e., rows without NaNs or infs) 

389 in a numpy structured array. 

390 

391 :param rec: numpy structured array 

392 :return: numpy structured array 

393 """ 

394 inf_mask = get_inf_mask(rec) 

395 nan_mask = get_nan_mask(rec) 

396 return ~(inf_mask | nan_mask) 

397 

398 

399def select_finite(rec): 

400 """ 

401 Remove rows with NaNs or infs from a numpy structured array. 

402 

403 :param rec: numpy structured array 

404 :return: numpy structured array 

405 """ 

406 return remove_infs(remove_nans(rec)) 

407 

408 

409def arr_to_rec(arr, dtype): 

410 """ 

411 Convert a numpy array to a numpy structured array given its dtype. 

412 

413 :param arr: numpy array 

414 :param dtype: dtype of the structured array 

415 :return: numpy structured array 

416 """ 

417 newrecarray = np.core.records.fromarrays(np.array(arr).transpose(), dtype=dtype) 

418 return newrecarray 

419 

420 

421def get_dtype_of_list(lst): 

422 """ 

423 Get the dtype of all elements in a list (must be uniform). 

424 

425 :param lst: list of arrays 

426 :return: numpy dtype 

427 :raises AssertionError: if not all elements have the same dtype 

428 """ 

429 # Convert the list to a NumPy array 

430 arr = np.array(lst) 

431 

432 # Get the data type of the first element in the array 

433 dtype = arr[0].dtype 

434 if len(arr) == 1: 

435 return dtype 

436 

437 # Check if the data type of every other element is the same as the first element 

438 all_the_same = True 

439 for element in arr[1:]: 

440 if element.dtype != dtype: 

441 all_the_same = False 

442 

443 assert all_the_same, "Not all entries of the list have the same dtype" 

444 

445 return dtype 

446 

447 

448def new_array(n_rows, columns, ints=None, float_dtype=np.float64, int_dtype=np.int64): 

449 """ 

450 Create a new structured array with specified columns. 

451 

452 :param n_rows: number of rows 

453 :param columns: list of column names 

454 :param ints: list of column names that should be integers 

455 :param float_dtype: dtype for float columns 

456 :param int_dtype: dtype for integer columns 

457 :return: numpy structured array initialized with zeros 

458 """ 

459 if ints is None: 

460 ints = [] 

461 n_columns = len(columns) 

462 formats = [None] * n_columns 

463 for ic in range(n_columns): 

464 if columns[ic] in ints: 

465 formats[ic] = int_dtype 

466 else: 

467 formats[ic] = float_dtype 

468 newrec = np.zeros(n_rows, dtype=np.dtype(list(zip(columns, formats)))) 

469 return newrec 

470 

471 

472def get_dtype(columns, main="f8", shapes=None): 

473 """ 

474 Create a numpy dtype from column names. 

475 

476 Column names can include dtype specification as 'name:dtype'. 

477 

478 :param columns: list of column names (optionally with ':dtype' suffix) 

479 :param main: default dtype for columns without explicit dtype 

480 :param shapes: list of shapes for each column 

481 :return: numpy dtype 

482 """ 

483 if main is None: 

484 main = "f8" 

485 elif isinstance(main, type): 

486 main = main.__name__ 

487 list_name = [] 

488 list_dtype = [] 

489 

490 if shapes is None: 

491 shapes = [() for _ in columns] 

492 

493 for col in columns: 

494 if ":" in col: 

495 name, dtype = col.split(":") 

496 else: 

497 name, dtype = col, main 

498 

499 list_name.append(str(name)) 

500 list_dtype.append(str(dtype)) 

501 

502 dtype = np.dtype(list(zip(list_name, list_dtype, shapes))) 

503 

504 return dtype 

505 

506 

507def get_storing_dtypes(dtype_list): 

508 """ 

509 Convert dtype list for HDF5 storage (unicode to bytes). 

510 

511 :param dtype_list: list of (name, dtype, ...) tuples 

512 :return: list of storage-compatible dtype tuples 

513 """ 

514 if six.PY2: 

515 dtypes = [(str(dt[0]), str(dt[1])) + dt[2:] for dt in dtype_list] 

516 

517 else: 

518 dtypes = [(dt[0], dt[1].replace("<U", "|S")) + dt[2:] for dt in dtype_list] 

519 

520 return dtypes 

521 

522 

523def get_loading_dtypes(dtype_list): 

524 """ 

525 Convert dtype list for loading from HDF5 (bytes to unicode). 

526 

527 :param dtype_list: list of (name, dtype, ...) tuples 

528 :return: list of Python-compatible dtype tuples 

529 """ 

530 if six.PY2: 

531 dtypes = [(str(dt[0]), str(dt[1])) + dt[2:] for dt in dtype_list] 

532 

533 else: 

534 dtypes = [] 

535 # handling for h5py>=2.10.0 

536 for dt in dtype_list: 

537 if isinstance(dt[1], tuple): 

538 dtypes.append((dt[0], dt[1][0].replace("|S", "<U")) + dt[2:]) 

539 else: 

540 dtypes.append((dt[0], dt[1].replace("|S", "<U")) + dt[2:]) 

541 

542 return dtypes 

543 

544 

545def set_storing_dtypes(arr): 

546 """ 

547 Convert array dtypes for HDF5 storage (unicode strings to bytes). 

548 

549 :param arr: numpy array, string, list, or scalar 

550 :return: storage-compatible version of input 

551 """ 

552 if six.PY2: 

553 return arr 

554 

555 # numpy array 

556 elif hasattr(arr, "dtype"): 

557 dtype = arr.dtype 

558 

559 # ndarray 

560 if len(dtype) == 0: 

561 dtypes_store = arr.dtype.str.replace("<U", "|S") 

562 

563 # recarray 

564 else: 

565 dtypes_store = get_storing_dtypes(arr.dtype.descr) 

566 

567 return arr.astype(dtypes_store) 

568 

569 # string 

570 elif isinstance(arr, str): 

571 return arr.encode("utf-8") 

572 

573 else: 

574 # list or tuple 

575 try: 

576 arr_fixed = [None] * len(arr) 

577 

578 for i, x in enumerate(arr): 

579 if isinstance(x, str): 

580 arr_fixed[i] = x.encode("utf-8") 

581 else: 

582 arr_fixed[i] = x 

583 

584 return arr_fixed 

585 

586 # single number 

587 except Exception: 

588 return arr 

589 

590 

591def set_loading_dtypes(arr): 

592 """ 

593 Convert array dtypes after loading from HDF5 (bytes to unicode strings). 

594 

595 :param arr: numpy array, bytes, list, or scalar 

596 :return: Python-compatible version of input 

597 """ 

598 if six.PY2: 

599 return arr 

600 

601 # numpy array 

602 elif hasattr(arr, "dtype"): 

603 dtype = arr.dtype 

604 

605 # ndarray 

606 if len(dtype) == 0: 

607 dtypes_load = arr.dtype.str.replace("|S", "<U").replace("|O", "<U") 

608 

609 # recarray 

610 else: 

611 dtypes_load = get_loading_dtypes(arr.dtype.descr) 

612 

613 return arr.astype(dtypes_load) 

614 

615 # bytes 

616 elif isinstance(arr, bytes): 

617 return arr.decode("utf-8") 

618 

619 else: 

620 # list or tuple 

621 try: 

622 arr_fixed = [None] * len(arr) 

623 

624 for i, x in enumerate(arr): 

625 if isinstance(x, bytes): 

626 arr_fixed[i] = x.decode("utf-8") 

627 else: 

628 arr_fixed[i] = x 

629 

630 return arr_fixed 

631 

632 # single number 

633 except Exception: 

634 return arr 

635 

636 

637def save_hdf(filename, arr, **kwargs): 

638 """ 

639 Save a structured array to an HDF5 file as a single 'data' dataset. 

640 

641 :param filename: path to the HDF5 file 

642 :param arr: numpy structured array to save 

643 :param kwargs: additional arguments passed to h5py.create_dataset 

644 """ 

645 f5 = h5py.File(name=filename, mode="a") 

646 

647 try: 

648 f5.clear() 

649 

650 except Exception: 

651 for datasetname in f5: 

652 del f5[datasetname] 

653 

654 arr_store = set_storing_dtypes(arr) 

655 f5.create_dataset(name="data", data=arr_store, **kwargs) 

656 f5.close() 

657 LOGGER.info(f"saved {filename}") 

658 

659 

660def write_to_hdf(filename, arr, name="data", compression="lzf", shuffle=True, **kwargs): 

661 """ 

662 Write a recarray to a hdf file. 

663 

664 :param filename: filename of the hdf file 

665 :param arr: numpy recarray 

666 :param name: name of the dataset 

667 :param compression: compression method 

668 :param shuffle: shuffle data before compression 

669 :param kwargs: keyword arguments for h5py.File.create_dataset 

670 """ 

671 with h5py.File(filename, "w") as fh5: 

672 fh5.create_dataset( 

673 name=name, data=arr, compression=compression, shuffle=shuffle, **kwargs 

674 ) 

675 

676 

677def load_hdf(filename, first_row=-1, last_row=-1): 

678 """ 

679 Load a structured array from an HDF5 file's 'data' dataset. 

680 

681 :param filename: path to the HDF5 file 

682 :param first_row: first row to load (-1 for beginning) 

683 :param last_row: last row to load (-1 for end) 

684 :return: numpy structured array 

685 """ 

686 f5 = h5py.File(name=filename, mode="r") 

687 

688 if (first_row > -1) & (last_row > -1) & (first_row < last_row): 

689 if first_row < last_row: 

690 data = np.array(f5["data"][first_row:last_row]) 

691 

692 else: 

693 raise Exception( 

694 f"first_row ({first_row}) should be smaller " 

695 f"than last_row ({last_row})" 

696 ) 

697 else: 

698 data = np.array(f5["data"]) 

699 

700 f5.close() 

701 

702 safe_dtypes = get_loading_dtypes(data.dtype.descr) 

703 data = data.astype(safe_dtypes, copy=False) 

704 

705 LOGGER.debug(f"loaded {filename}") 

706 

707 return data 

708 

709 

710def save_hdf_cols(filename, arr, compression=None, resizable=False, suppress_log=False): 

711 """ 

712 Save a structured array to HDF5 with each column as a separate dataset. 

713 

714 :param filename: path to the HDF5 file 

715 :param arr: numpy structured array to save 

716 :param compression: compression method (e.g., 'lzf', 'gzip') or dict 

717 :param resizable: if True, create resizable datasets 

718 :param suppress_log: if True, log at debug level instead of info 

719 """ 

720 if compression is None: 

721 kwargs = {} 

722 else: 

723 if isinstance(compression, dict): 

724 kwargs = compression 

725 else: 

726 kwargs = {"compression": compression} 

727 

728 dtypes_store = get_storing_dtypes(arr.dtype.descr) 

729 

730 with h5py.File(filename, "w") as fh5: 

731 for dt in dtypes_store: 

732 col = dt[0] 

733 dtype = dt[1] 

734 extra_dims = dt[2:] 

735 

736 if col in fh5: 

737 del fh5[col] 

738 

739 if resizable: 

740 kwargs["maxshape"] = (None,) + extra_dims 

741 

742 fh5.create_dataset( 

743 name=col, data=arr[col].astype(dtype, copy=False), **kwargs 

744 ) 

745 

746 log_message = f"saved hdf file {filename} with {len(arr)} rows" 

747 if suppress_log: 

748 LOGGER.debug(log_message) 

749 else: 

750 LOGGER.info(log_message) 

751 

752 

753def load_hdf_cols_from_file( 

754 filename, 

755 columns="all", 

756 first_row=0, 

757 last_row=-1, 

758 cols_to_add=(), 

759 selectors=None, 

760 verb=True, 

761): 

762 """ 

763 Loads all columns of an hdf file into one recarray. 

764 

765 :param filename: path to hdf file 

766 :param columns: list of columns to load, "all" to load all columns 

767 :param first_row: first row to load 

768 :param last_row: last row to load 

769 :param cols_to_add: list of columns to add to the recarray 

770 :param selectors: dictionary of selection masks for columns 

771 :param verb: if True, print information 

772 :return: recarray 

773 """ 

774 with h5py.File(filename, mode="r") as fh5: 

775 # Infer columns to load 

776 if columns == "all": 

777 columns = list(fh5.keys()) 

778 

779 # Infer size of data to load 

780 if last_row is not None: 

781 if last_row > 0: 

782 size = last_row - first_row 

783 else: 

784 size = len(fh5[columns[0]]) + last_row 

785 last_row = size 

786 

787 if size < 0 or last_row < first_row: 

788 raise Exception( 

789 f"Combination first_row={first_row}, " 

790 f"last_row={last_row} invalid" 

791 ) 

792 else: 

793 size = len(fh5[columns[0]]) 

794 last_row = size 

795 

796 # Get boolean mask in case any selectors were specified 

797 if selectors is not None and len(selectors) > 0: 

798 select = np.ones(size, dtype=np.bool) 

799 

800 for cols in selectors: 

801 select_fun = selectors[cols] 

802 

803 if isinstance(cols, str): 

804 cols = (cols,) 

805 

806 select &= select_fun(*[fh5[c][first_row:last_row] for c in cols]) 

807 

808 size = np.count_nonzero(select) 

809 

810 if verb: 

811 LOGGER.info( 

812 f"masking {select.size - size} / {select.size} rows " 

813 f"while loading {filename}" 

814 ) 

815 

816 else: 

817 select = np.s_[:size] 

818 

819 # Create array holding loaded data 

820 dtype_list = [(col, fh5[col].dtype.str, fh5[col].shape[1:]) for col in columns] 

821 dtypes_load = get_loading_dtypes(dtype_list) 

822 dtypes_load += [(col, np.float32) for col in cols_to_add] 

823 arr = np.empty(size, dtype=dtypes_load) 

824 

825 for col in columns: 

826 arr[col] = fh5[col][first_row:last_row][select] 

827 

828 for col in cols_to_add: 

829 arr[col] = 0.0 

830 

831 return arr 

832 

833 

834def col_name_to_path(dirname, colname): 

835 """ 

836 Get the HDF5 file path for a column stored in a directory. 

837 

838 :param dirname: directory path 

839 :param colname: column name 

840 :return: full path to the column's HDF5 file 

841 """ 

842 return os.path.join(dirname, colname + ".h5") 

843 

844 

845def get_hdf_col_names(path): 

846 """ 

847 Get column names from an HDF5 file or directory of HDF5 files. 

848 

849 :param path: path to HDF5 file or directory 

850 :return: list of column names 

851 """ 

852 # file 

853 if os.path.isfile(path): 

854 with h5py.File(path, mode="r") as fh5: 

855 columns = list(fh5.keys()) 

856 

857 # directory 

858 columns = [ 

859 name for name, ext in map(os.path.splitext, os.listdir(path)) if ext == ".h5" 

860 ] 

861 

862 return columns 

863 

864 

865def load_hdf_cols_from_directory( 

866 dirname, 

867 columns="all", 

868 first_row=0, 

869 last_row=-1, 

870 copy_local=False, 

871 dirname_parent=None, 

872 allow_nonexisting=False, 

873 cols_to_add=(), 

874 selectors=None, 

875 verb=True, 

876 copy_editor=None, 

877): 

878 """ 

879 Load columns stored as individual HDF5 files in a directory into one recarray. 

880 

881 :param dirname: directory containing column HDF5 files 

882 :param columns: list of columns to load, or "all" for all columns 

883 :param first_row: first row to load 

884 :param last_row: last row to load (-1 means last row minus 1) 

885 :param copy_local: if True, copy files locally before loading 

886 :param dirname_parent: parent directory to search for missing columns 

887 :param allow_nonexisting: if True, don't fail if directory doesn't exist 

888 :param cols_to_add: additional columns to add (initialized to 0) 

889 :param selectors: dict of column-based selection functions 

890 :param verb: if True, print progress information 

891 :param copy_editor: function to modify paths before copying 

892 :return: numpy structured array 

893 """ 

894 if selectors is None: 

895 selectors = dict() 

896 

897 if copy_editor is None: 

898 copy_editor = lambda p: p # noqa 

899 

900 # Infer columns to load 

901 columns_all = [] 

902 if not allow_nonexisting or os.path.exists(dirname): 

903 columns_all += [ 

904 name 

905 for name, ext in map(os.path.splitext, os.listdir(dirname)) 

906 if ext == ".h5" 

907 ] 

908 if dirname_parent is not None: 

909 columns_all += [ 

910 name 

911 for name, ext in map(os.path.splitext, os.listdir(dirname_parent)) 

912 if ext == ".h5" 

913 ] 

914 columns_all = list(set(columns_all)) 

915 

916 columns = columns_all if columns == "all" else list(columns) 

917 

918 # Get filename 

919 # (for all columns s.t. one can also use selectors 

920 # using columns that are not loaded) 

921 dict_filename_cols = {} 

922 for col in set(columns_all + columns + list(selectors.keys())): 

923 # get the filename from the right folder 

924 filename_col = col_name_to_path(dirname, col) 

925 

926 if not os.path.isfile(filename_col) and dirname_parent is not None: 

927 filename_col = col_name_to_path(dirname_parent, col) 

928 

929 dict_filename_cols[col] = filename_col 

930 

931 # Infer size of data to load 

932 if last_row is not None: 

933 if last_row > 0: 

934 size = last_row - first_row 

935 else: 

936 with h5py.File(dict_filename_cols[columns[0]], mode="r") as fh5: 

937 size = len(fh5[columns[0]]) + last_row 

938 last_row = size 

939 

940 if size < 0 or last_row < first_row: 

941 raise Exception( 

942 f"Combination first_row={first_row}, " 

943 f"last_row={last_row} invalid" 

944 ) 

945 else: 

946 with h5py.File(dict_filename_cols[columns[0]], mode="r") as fh5: 

947 size = len(fh5[columns[0]]) 

948 last_row = size 

949 

950 # Get boolean mask in case any selectors were specified 

951 columns_copied = {} 

952 

953 if len(selectors) > 0: 

954 select = np.ones(size, dtype=np.bool) 

955 

956 for cols in selectors: 

957 select_fun = selectors[cols] 

958 

959 if isinstance(cols, str): 

960 cols = (cols,) 

961 

962 data_select = [None] * len(cols) 

963 

964 for ic, c in enumerate(cols): 

965 filename_col = dict_filename_cols[c] 

966 

967 if copy_local: 

968 if c not in columns_copied: 

969 path_local = os.path.join( 

970 os.getcwd(), os.path.basename(filename_col) 

971 ) 

972 file_utils.robust_copy( 

973 copy_editor(filename_col), 

974 path_local, 

975 use_copyfile=True, 

976 ) 

977 path_load = path_local 

978 columns_copied[c] = path_load 

979 else: 

980 path_load = columns_copied[c] 

981 

982 else: 

983 path_load = filename_col 

984 

985 with h5py.File(path_load, mode="r") as fh5: 

986 data_select[ic] = fh5[c][first_row:last_row] 

987 

988 select &= select_fun(*data_select) 

989 

990 size = np.count_nonzero(select) 

991 del data_select 

992 

993 if verb: 

994 LOGGER.info( 

995 f"masking {select.size - size} / {select.size} rows " 

996 f"while loading {dirname}" 

997 ) 

998 

999 else: 

1000 select = np.s_[:size] 

1001 

1002 # Create array holding loaded data 

1003 dtypes_list = [None] * len(columns) 

1004 

1005 for i, col in enumerate(columns): 

1006 with h5py.File(dict_filename_cols[col], mode="r") as fh5: 

1007 dtypes_list[i] = (col, fh5[col].dtype.str, fh5[col].shape[1:]) 

1008 

1009 dtypes_load = get_loading_dtypes(dtypes_list) 

1010 dtypes_load += [(col, np.float32) for col in cols_to_add] 

1011 arr = np.empty(size, dtype=dtypes_load) 

1012 

1013 # Now copy files one by one and read data 

1014 for col in columns: 

1015 filename_col = dict_filename_cols[col] 

1016 

1017 if copy_local: 

1018 if col in columns_copied: 

1019 path_local = columns_copied[col] 

1020 

1021 else: 

1022 path_local = os.path.join(os.getcwd(), os.path.basename(filename_col)) 

1023 file_utils.robust_copy( 

1024 copy_editor(filename_col), path_local, use_copyfile=True 

1025 ) 

1026 columns_copied[col] = path_local 

1027 

1028 path_load = path_local 

1029 

1030 else: 

1031 path_load = filename_col 

1032 

1033 with h5py.File(path_load, mode="r") as fh5: 

1034 arr[col] = fh5[col][first_row:last_row][select] 

1035 

1036 if copy_local: 

1037 os.remove(path_load) 

1038 del columns_copied[col] 

1039 

1040 # Remove remaining columns (columns used for selecting only) 

1041 for path_local in columns_copied.values(): 

1042 os.remove(path_local) 

1043 

1044 # Set newly added columns to zero 

1045 for col in cols_to_add: 

1046 arr[col] = 0.0 

1047 

1048 return arr 

1049 

1050 

1051def load_hdf_cols( 

1052 filename, 

1053 columns="all", 

1054 first_row=0, 

1055 last_row=None, 

1056 verb=True, 

1057 copy_local=True, 

1058 filename_parent=None, 

1059 allow_nonexisting=False, 

1060 cols_to_add=(), 

1061 selectors=None, 

1062 copy_editor=None, 

1063): 

1064 """ 

1065 Load columns from an HDF5 file or directory of HDF5 files. 

1066 

1067 Automatically detects whether filename is a file or directory. 

1068 

1069 :param filename: path to HDF5 file or directory 

1070 :param columns: list of columns to load, or "all" for all columns 

1071 :param first_row: first row to load 

1072 :param last_row: last row to load (None for all rows) 

1073 :param verb: if True, print progress information 

1074 :param copy_local: if True, copy files locally before loading 

1075 :param filename_parent: parent path to search for missing columns 

1076 :param allow_nonexisting: if True, don't fail if path doesn't exist 

1077 :param cols_to_add: additional columns to add (initialized to 0) 

1078 :param selectors: dict of column-based selection functions 

1079 :param copy_editor: function to modify paths before copying 

1080 :return: numpy structured array 

1081 """ 

1082 if last_row is not None and (last_row > 0) & (first_row >= last_row): 

1083 raise Exception( 

1084 f"first_row {first_row} should be smaller than last_row {last_row}" 

1085 ) 

1086 

1087 if os.path.isfile(filename): 

1088 arr = load_hdf_cols_from_file( 

1089 filename, 

1090 columns=columns, 

1091 first_row=first_row, 

1092 last_row=last_row, 

1093 cols_to_add=cols_to_add, 

1094 selectors=selectors, 

1095 verb=verb, 

1096 ) 

1097 

1098 else: 

1099 arr = load_hdf_cols_from_directory( 

1100 filename, 

1101 columns=columns, 

1102 first_row=first_row, 

1103 last_row=last_row, 

1104 copy_local=copy_local, 

1105 dirname_parent=filename_parent, 

1106 allow_nonexisting=allow_nonexisting, 

1107 cols_to_add=cols_to_add, 

1108 selectors=selectors, 

1109 verb=verb, 

1110 copy_editor=copy_editor, 

1111 ) 

1112 

1113 if verb: 

1114 LOGGER.info(f"loaded {filename} with n_rows={len(arr)}") 

1115 

1116 return arr 

1117 

1118 

1119def append_rows_to_h5dset(dset, array): 

1120 """ 

1121 Append rows to an existing HDF5 dataset. 

1122 

1123 :param dset: h5py dataset (must be resizable) 

1124 :param array: numpy array to append 

1125 """ 

1126 nr_dset, nr_array = dset.shape[0], array.shape[0] 

1127 nr_new = nr_dset + nr_array 

1128 dset.resize(nr_new, axis=0) 

1129 dset[nr_dset:] = set_storing_dtypes(array) 

1130 

1131 

1132def replace_hdf5_dataset(fobj, name, data, **kwargs): 

1133 """ 

1134 Replace or create a dataset in an HDF5 file. 

1135 

1136 :param fobj: h5py File object 

1137 :param name: dataset name 

1138 :param data: data to write 

1139 :param kwargs: additional arguments for create_dataset 

1140 """ 

1141 if name in fobj: 

1142 del fobj[name] 

1143 fobj.create_dataset(name=name, data=set_storing_dtypes(data), **kwargs) 

1144 

1145 

1146def overwrite_hdf5_column(path, name, data, **kwargs): 

1147 """ 

1148 Overwrite a column in an existing HDF5 file. 

1149 

1150 :param path: path to HDF5 file or directory 

1151 :param name: column/dataset name 

1152 :param data: new data to write 

1153 :param kwargs: additional arguments for create_dataset 

1154 """ 

1155 path_col = col_name_to_path(path, name) if os.path.isdir(path) else path 

1156 

1157 with h5py.File(path_col, mode="r+") as fh5: 

1158 replace_hdf5_dataset(fh5, name, set_storing_dtypes(data), **kwargs) 

1159 

1160 

1161def check_hdf_column(filename, column_name): 

1162 """ 

1163 Check if a column exists in an HDF5 file or directory. 

1164 

1165 :param filename: path to HDF5 file or directory 

1166 :param column_name: name of the column to check 

1167 :return: True if column exists, False otherwise 

1168 """ 

1169 if os.path.isfile(filename): 

1170 with h5py.File(filename, mode="r") as fh5: 

1171 col_found = column_name in fh5 

1172 

1173 else: 

1174 filename_col = col_name_to_path(filename, column_name) 

1175 col_found = os.path.isfile(filename_col) 

1176 

1177 return col_found 

1178 

1179 

1180def rec_float64_to_float32(cat): 

1181 """ 

1182 Convert float64 columns in a structured array to float32. 

1183 

1184 :param cat: numpy structured array 

1185 :return: structured array with float32 instead of float64 

1186 """ 

1187 list_new_dtype = [] 

1188 

1189 all_ok = True 

1190 

1191 for i in range(len(cat.dtype)): 

1192 if cat.dtype[i] == np.float64: 

1193 list_new_dtype.append(np.float32) 

1194 all_ok = False 

1195 

1196 else: 

1197 list_new_dtype.append(cat.dtype[i]) 

1198 

1199 if all_ok: 

1200 return cat 

1201 

1202 else: 

1203 new_dtype = np.dtype(dict(formats=list_new_dtype, names=cat.dtype.names)) 

1204 cat_new = cat.astype(new_dtype) 

1205 return cat_new 

1206 

1207 

1208def save_dict_to_hdf5(filename, data_dict, kw_compress=None): 

1209 """ 

1210 Save a nested dictionary to HDF5 with groups and datasets. 

1211 

1212 :param filename: path to the HDF5 file 

1213 :param data_dict: nested dict {group_name: {dataset_name: data}} 

1214 :param kw_compress: compression kwargs (default: lzf with shuffle) 

1215 """ 

1216 if kw_compress is None: 

1217 kw_compress = {} 

1218 kw_compress.setdefault("compression", "lzf") 

1219 kw_compress.setdefault("shuffle", True) 

1220 

1221 f = h5py.File(filename, "w") 

1222 for grp_name in data_dict: 

1223 grp = f.create_group(grp_name) 

1224 for dset_name in data_dict[grp_name]: 

1225 grp.create_dataset( 

1226 dset_name, data=data_dict[grp_name][dset_name], **kw_compress 

1227 ) 

1228 f.close() 

1229 

1230 

1231def nanequal(a, b): 

1232 """ 

1233 Element-wise equality comparison that treats NaN == NaN as True. 

1234 

1235 :param a: first array 

1236 :param b: second array 

1237 :return: boolean array 

1238 """ 

1239 return (np.isnan(b) & np.isnan(a)) | (a == b) 

1240 

1241 

1242def append_hdf(filename, arr, compression=None, **kwargs): 

1243 """ 

1244 Append structured array data to HDF5 file. 

1245 Creates file if it doesn't exist, appends if it does. 

1246 """ 

1247 # Check if file exists 

1248 if not os.path.exists(filename): 

1249 # Create new file with resizable dataset 

1250 kwargs = {} 

1251 if compression is not None: 

1252 kwargs["compression"] = compression 

1253 

1254 arr_store = set_storing_dtypes(arr) 

1255 

1256 with h5py.File(filename, "w") as f5: 

1257 f5.create_dataset( 

1258 name="data", data=arr_store, maxshape=(None,), chunks=True, **kwargs 

1259 ) 

1260 LOGGER.debug(f"created hdf file {filename} with {len(arr)} rows") 

1261 else: 

1262 # Append to existing file 

1263 with h5py.File(filename, "a") as fh5: 

1264 dset = fh5["data"] 

1265 append_rows_to_h5dset(dset, arr) 

1266 LOGGER.debug(f"appended to hdf file {filename} with {len(arr)} additional rows")