Skip to content

API

Under Construction

Core Module (serval.core.py)

PowerBeamVisProjector

Bases: VisProjector

Parameters:

Name Type Description Default
latitude float
required
longitude float
required
baseline_enu tuple[float, float, float]
required
power_beam_lmax int
required
frequencies_MHz ndarray
required
baseline_lmax int | None
None
sky_lmax int | None
None
sky_absm_limits list[Optional[int]]
[0, None]
generate_gaunt_cache_on_init bool
False
generate_baseline_cache_on_init bool
False
generate_pointing_contractor_on_init bool
False
batch_parallel_mode Literal['channel-opt', 'channel', 'gaunt']
'channel'
pointing_contract bool
False
pointing_altitude float | None
None
pointing_azimuth float | None
None
pointing_boresight float | None
None
pointed_beam_mmax int | None
None

Attributes:

Name Type Description
pointing_contractor ndarray | None
Source code in src/serval/core.py
@define(eq=False)
class PowerBeamVisProjector(VisProjector):
    latitude: float
    longitude: float
    baseline_enu: tuple[float, float, float]
    power_beam_lmax: int
    frequencies_MHz: np.ndarray

    baseline_lmax: int | None = None
    sky_lmax: int | None = None
    sky_absm_limits: list[int | None] = field(factory=lambda: [0, None])
    generate_gaunt_cache_on_init: bool = False
    generate_baseline_cache_on_init: bool = False
    generate_pointing_contractor_on_init: bool = False
    batch_parallel_mode: Literal["channel-opt", "channel", "gaunt"] = "channel"
    pointing_contract: bool = False
    pointing_altitude: float | None = None
    pointing_azimuth: float | None = None
    pointing_boresight: float | None = None
    pointed_beam_mmax: int | None = None

    pointing_contractor: np.ndarray | None = field(init=False, default=None)

    _to_attrs: ClassVar[list[str]] = [
        "latitude",
        "longitude",
        "power_beam_lmax",
        "baseline_lmax",
        "sky_lmax",
        "baseline_enu",
        "pointing_contract",
        "pointing_altitude",
        "pointing_azimuth",
        "pointing_boresight",
        "pointed_beam_mmax",
    ]
    _to_zarr_store: ClassVar[list[str]] = ["frequencies_MHz"]
    _opt_tag: ClassVar[str] = "opt12"

    def __attrs_post_init__(self):
        if self.generate_baseline_cache_on_init:
            self.generate_baseline_cache()
        lmax, self.baseline_mmax = baseline_bandlimits(
            self.baseline_enu,
            self.frequencies_MHz.max(),
            self.latitude,
            self.longitude,
        )
        if self.baseline_lmax is None:
            self.baseline_lmax = lmax
        self.set_bandlimits_from_baseline()
        if self.generate_gaunt_cache_on_init:
            self.generate_gaunt_cache()
        else:
            self.update_integrator()
        if self.pointing_contract:
            if self.pointed_beam_mmax is None:
                raise ValueError(
                    "Cannot determine pointing_contractor with the inputs given, "
                    "because pointed_beam_mmax is None."
                )
            if self.pointed_beam_mmax > self.power_beam_lmax:
                raise ValueError(
                    "pointed_beam_mmax cannot be larger than power_beam_lmax."
                )
            if self.generate_pointing_contractor_on_init:
                self.generate_pointing_contractor()

    def generate_pointing_contractor(self):
        self.pointing_contractor = wigner_d(
            self.power_beam_lmax,
            self.pointed_beam_mmax,
            tirs_to_pointing(
                latitude=self.latitude,
                longitude=self.longitude,
                altitude=self.pointing_altitude,
                azimuth=self.pointing_azimuth,
                boresight=self.pointing_boresight,
            ).inv(),
        )

    def generate_baseline_cache(self):
        self.baseline_cache = tirs_baseline_alm(
            enu=self.baseline_enu,
            frequencies_MHz=self.frequencies_MHz,
            latitude=self.latitude,
            longitude=self.longitude,
            lmax=self.baseline_lmax,
        )
        self.baseline_lmax = self.baseline_cache.shape[1] - 1

    def set_bandlimits_from_baseline(self):
        if self.sky_lmax is None:
            # No warning here, as it's the default case.
            self.sky_lmax = self.baseline_lmax + self.power_beam_lmax
        elif self.sky_lmax > self.baseline_lmax + self.power_beam_lmax:
            self.sky_lmax = self.baseline_lmax + self.power_beam_lmax
            warnings.warn(
                "The value of sky_lmax is too large. It is changed to the sum of baseline_lmax and "
                "power_beam_lmax which is " + str(self.sky_lmax) + ".",
                stacklevel=2,  # points to caller
            )
        elif self.sky_lmax < self.baseline_lmax + self.power_beam_lmax:
            warnings.warn(
                "The value of sky_lmax is smaller than the optimal value given the "
                "baseline and beam which is "
                + str(self.baseline_lmax + self.power_beam_lmax)
                + ". Consider changing it.",
                stacklevel=2,
            )  # points to caller
        if self.sky_absm_limits[1] is None:
            self.sky_absm_limits = (
                0,
                min(self.baseline_mmax + self.power_beam_lmax, self.sky_lmax) + 1,
            )
            # No warning here as it's a very common case to want this auto-generated.
        else:
            if self.sky_absm_limits[1] > self.sky_lmax + 1:
                self.sky_absm_limits = list(self.sky_absm_limits)
                self.sky_absm_limits[1] = self.sky_lmax + 1
                warnings.warn(
                    "The upper limit for the mmodes of the sky cannot be larger than "
                    "sky_lmax + 1. It is changed to "
                    + str(self.sky_absm_limits[1])
                    + ".",
                    stacklevel=2,
                )  # points to caller
            if self.sky_absm_limits[1] - 1 != self.baseline_mmax + self.power_beam_lmax:
                warnings.warn(
                    "The upper limit for the mmodes of the sky does not correspond to the " \
                    "optimal value given the baseline and beam which is "
                    + str(self.baseline_mmax + self.power_beam_lmax)
                    + ". Consider changing it.",
                    stacklevel=2,
                )  # points to caller
            self.sky_absm_limits = tuple(self.sky_absm_limits)

    def beam_linear_mmode_generator(self, sky_alms, release_gaunt_cache: bool = True):
        sky_alms = extend_dimensions_if_one_batch(sky_alms, 3)
        if sky_alms.shape[0] != len(self.frequencies_MHz):
            raise ValueError(
                "The sky_alms coefficients do not correspond to the number of frequency bins."
            )
        if sky_alms.shape[1] != self.sky_lmax + 1:
            raise ValueError(
                "The sky_alms coefficients do not correspond to the expected sky_lmax "
                "(baseline_lmax + power_beam_lmax)."
            )
        if sky_alms.shape[2] != 2 * self.sky_lmax + 1:
            raise ValueError(
                "The sky_alms coefficients do not correspond to the full harmonic orders "
                "of sky_lmax (baseline_lmax + power_beam_lmax) -> (ms = 2 * sky_lmax + 1)."
            )
        if self.baseline_cache is not None:
            self.generate_baseline_cache()
        if self.baseline_cache is None:
            raise ValueError("baseline_cache is None. It must be generated beforehand.")
        if self.pointing_contract and self.pointing_contractor is None:
            self.generate_pointing_contractor()
        self.triple_sh_integrator.generate_integrator_cache_12(
            sky_alms,
            self.baseline_cache,
            contract3=self.pointing_contractor,
            release_gaunt_cache=release_gaunt_cache,
            batch_parallel_mode=self.batch_parallel_mode,
        )
        self.integrator_cache = self.triple_sh_integrator.linear_integrator_cache_12
        return self.integrator_cache

    def mmodes(self, beam_alms, sky_alms=None):
        beam_alms = extend_dimensions_if_one_batch(beam_alms, 3)
        if beam_alms.shape[0] != len(self.frequencies_MHz):
            raise ValueError(
                "The beam_alms coefficients do not correspond to the number of frequency bins."
            )
        if (
            beam_alms.shape[1] != self.power_beam_lmax + 1
        ):
            raise ValueError(
                "The beam_alms coefficients do not correspond to the inputted power_beam_lmax."
            )
        if self.pointing_contract:
            if beam_alms.shape[2] != 2 * self.pointed_beam_mmax + 1:
                raise ValueError(
                    "The beam_alms coefficients do not correspond to the expected "
                    "pointed_beam_mmax."
                )
        else:
            if beam_alms.shape[2] != 2 * self.power_beam_lmax + 1:
                raise ValueError(
                    "The beam_alms coefficients do not correspond to the full harmonic orders "
                    "of power_beam_lmax -> (ms = 2 * power_beam_lmax + 1)."
                )
        if sky_alms is None and self.integrator_cache is None:
            raise ValueError("Must specify either sky_alms or integrator cache.")
        else:
            if sky_alms is None:
                integrator_cache = self.integrator_cache
            else:
                integrator_cache = self.beam_linear_mmode_generator(sky_alms=sky_alms)
            integrator_cache = extend_dimensions_if_one_batch(integrator_cache, 4)
            return self.triple_sh_integrator.batch_gaunt_integrate_cached_12(
                beam_alms, integrator_cache=integrator_cache
            )

    def visibilities(
        self,
        beam_alms,
        sky_alms=None,
        return_mmodes=False
    ):
        mmodes = self.mmodes(beam_alms=beam_alms, sky_alms=sky_alms)
        vis = mmodes_to_visibilities(mmodes, self.sky_lmax, ms=self.sky_m_values)
        if return_mmodes:
            return vis, mmodes
        else:
            return vis

    def setup_zarr_store(
        self,
        store_location: str | Path,
        frequency_chunksize: int | None = None,
        group_path: str = r"/",
        store_metadata_attrs: dict | None = None,
    ):
        if self.pointing_contract:
            num_beam_ms = 2 * self.pointed_beam_mmax + 1
        else:
            num_beam_ms = 2 * self.power_beam_lmax + 1
        integrator_cache_shape = (
            self.frequencies_MHz.size,
            2 * self.sky_lmax + 1,
            self.power_beam_lmax + 1,
            num_beam_ms,
        )
        integrator_cache_chunks = (
            (
                frequency_chunksize
                if frequency_chunksize is not None
                else self.frequencies_MHz.size
            ),
            1,  # per m-chunking for parallel writes
            self.power_beam_lmax + 1,
            num_beam_ms,
        )
        self._setup_zarr_store(
            store_location=store_location,
            group_path=group_path,
            integrator_cache_shape=integrator_cache_shape,
            integrator_cache_chunks=integrator_cache_chunks,
            store_metadata_attrs=store_metadata_attrs,
        )

SingleVisibilityGenerator

Parameters:

Name Type Description Default
latitude float
required
longitude float
required
baseline_enu tuple[float, float, float]
required
frequency_MHz float
required
power_beam_alm ndarray[tuple[Any, ...], dtype[complex128]]
required
sky_alm ndarray[tuple[Any, ...], dtype[complex128]]
required
method Literal['gaunt', 'gaunt_sparse', 'grid']
'gaunt'
baseline_lmax int | None
None
sky_absm_limits tuple[int, Optional[int]]
(0, None)
baseline_alm None | ndarray[tuple[Any, ...], dtype[complex128]]
None

Attributes:

Name Type Description
sky_lmax int
power_beam_lmax int
triple_sh_integrator TripleSHIntegrator
Source code in src/serval/core.py
@define(eq=False)
class SingleVisibilityGenerator:
    latitude: float
    longitude: float
    baseline_enu: tuple[float, float, float]
    frequency_MHz: float
    power_beam_alm: npt.NDArray[np.complex128]
    sky_alm: npt.NDArray[np.complex128]

    method: Literal["gaunt", "gaunt_sparse", "grid"] = "gaunt"
    baseline_lmax: int | None = None

    sky_absm_limits: tuple[int, int | None] = (0, None)

    baseline_alm: None | npt.NDArray[np.complex128] = None

    sky_lmax: int = field(init=False)
    power_beam_lmax: int = field(init=False)
    triple_sh_integrator: TripleSHIntegrator = field(init=False)

    def generate_baseline_cache(self):
        self.baseline_alm = tirs_baseline_alm(
            enu=self.baseline_enu,
            frequencies_MHz=np.atleast_1d(self.frequency_MHz)[None, :],
            latitude=self.latitude,
            longitude=self.longitude,
            lmax=self.baseline_lmax,
        )[0, 0, ...]  # TODO Figure out broadcasting here... The shape of the matrix:
        # [None from np.atleast_1s, len(frequencies_MHz)=1, lmax + 1, 2 * lmax + 1]
        self.baseline_lmax = self.baseline_alm.shape[0] - 1

    def __attrs_post_init__(self):
        if not isinstance(self.frequency_MHz, float | int):
            raise TypeError("frequency_MHz must be a float or integer")
        if self.baseline_alm is None:
            self.generate_baseline_cache()
        elif self.baseline_lmax is None:
            self.baseline_lmax = self.baseline_alm.shape[0] - 1
        else:
            if self.baseline_lmax != self.baseline_alm.shape[0] - 1:
                raise ValueError(
                    "The defined harmonic degree baseline_lmax does not "
                    "correspond to the defined baseline_alm coefficients"
                )
        self.sky_lmax = self.sky_alm.shape[0] - 1
        self.power_beam_lmax = self.power_beam_alm.shape[0] - 1
        self.triple_sh_integrator = TripleSHIntegrator(
            l1max=self.sky_lmax,
            l2max=self.baseline_lmax,
            l3max=self.power_beam_lmax,
            absm1_limits=self.sky_absm_limits,
            generate_cache_on_init=False,
        )

    @property
    def sky_m_values(self):
        return self.triple_sh_integrator.m1_values

    @property
    def sky_m_index_map(self):
        return self.triple_sh_integrator.m1_index_map

    @property
    def num_sky_ms(self):
        return len(self.sky_m_values)

    def mmodes(
        self,
    ):
        if self.method == "grid":
            return self.triple_sh_integrator.grid_integrate(
                alm1=self.sky_alm, alm2=self.baseline_alm, alm3=self.power_beam_alm
            )
        elif self.method == "gaunt_sparse":
            return self.triple_sh_integrator.sparse_gaunt_einsum(
                sparse_alm1=sparse.COO.from_numpy(self.sky_alm),
                sparse_alm2=sparse.COO.from_numpy(self.baseline_alm),
                sparse_alm3=sparse.COO.from_numpy(self.power_beam_alm),
            )
        elif self.method == "gaunt":
            return self.triple_sh_integrator.gaunt_integrate(
                alm1=self.sky_alm, alm2=self.baseline_alm, alm3=self.power_beam_alm
            )

    def visibilities(self, return_mmodes=False):
        mmodes = self.mmodes()
        vis = mmodes_to_visibilities(mmodes, m1max=self.sky_lmax, ms=self.sky_m_values)
        if return_mmodes:
            return vis, mmodes
        else:
            return vis

SkyVisProjector

Bases: VisProjector

Parameters:

Name Type Description Default
latitude float
required
longitude float
required
baseline_enu tuple[float, float, float]
required
sky_lmax int
required
frequencies_MHz ndarray
required
baseline_lmax int | None
None
power_beam_lmax int | None
None
sky_absm_limits list[Optional[int]]
[0, None]
generate_gaunt_cache_on_init bool
False
generate_baseline_cache_on_init bool
False
batch_parallel_mode Literal['channel', 'gaunt']
'channel'
Source code in src/serval/core.py
@define(eq=False)
class SkyVisProjector(VisProjector):
    latitude: float
    longitude: float
    baseline_enu: tuple[float, float, float]
    sky_lmax: int
    frequencies_MHz: np.ndarray

    baseline_lmax: int | None = None
    power_beam_lmax: int | None = None
    sky_absm_limits: list[int | None] = field(factory=lambda: [0, None])
    generate_gaunt_cache_on_init: bool = False
    generate_baseline_cache_on_init: bool = False
    batch_parallel_mode: Literal["channel", "gaunt"] = "channel"

    _to_attrs = [
        "latitude",
        "longitude",
        "sky_lmax",
        "power_beam_lmax",
        "baseline_lmax",
        "baseline_enu",
    ]
    _to_zarr_store = ["frequencies_MHz"]

    def __attrs_post_init__(self):
        if self.generate_baseline_cache_on_init:
            self.generate_baseline_cache()
        if self.power_beam_lmax is None:
            if self.baseline_lmax is not None:
                if self.power_beam_lmax != self.sky_lmax - self.baseline_lmax:
                    warnings.warn(
                        "The value of power_beam_lmax is changed to the difference "
                        "of baseline_lmax from sky_lmax.",
                        stacklevel=2,  # points to caller
                    )
                    self.power_beam_lmax = self.sky_lmax - self.baseline_lmax
            else:
                raise ValueError(
                    "Cannot determine power_beam_lmax with inputs given.\n"
                    "Either baseline_cache needs to be computed on input, baseline_lmax must be "
                    "specified or power_beam_lmax must be specified."
                )
        self.triple_sh_integrator = TripleSHIntegrator(
            l1max=self.sky_lmax,
            l2max=self.baseline_lmax,
            l3max=self.power_beam_lmax,
            absm1_limits=self.sky_absm_limits,
            generate_cache_on_init=False,
        )
        if self.generate_gaunt_cache_on_init:
            self.generate_gaunt_cache()

    def generate_baseline_cache(self):
        self.baseline_cache = tirs_baseline_alm(
            enu=self.baseline_enu,
            frequencies_MHz=self.frequencies_MHz,
            latitude=self.latitude,
            longitude=self.longitude,
            lmax=self.baseline_lmax,
        )
        # Can assert this is the same as input if not none.
        self.baseline_lmax = self.baseline_cache.shape[1] - 1
        # Only changed here:
        if self.power_beam_lmax is None:
            warnings.warn(
                "The value of power_beam_lmax is changed to the difference "
                "of baseline_lmax from sky_lmax.",
                stacklevel=2,  # points to caller
            )
            self.power_beam_lmax = self.sky_lmax - self.baseline_lmax

    def sky_linear_mmode_generator(self, beam_alms):
        beam_alms = extend_dimensions_if_one_batch(beam_alms, 3)
        if beam_alms.shape[0] != len(self.frequencies_MHz):
            raise ValueError(
                "The beam_alms coefficients do not correspond to the number of frequency bins."
            )
        if (
            beam_alms.shape[1] != self.power_beam_lmax + 1
            or beam_alms.shape[2] != 2 * self.power_beam_lmax + 1
        ):
            raise ValueError(
                "The beam_alms coefficients do not correspond to the expected power_beam_lmax "
                "(sky_lmax - baseline_lmax). Either number ls or ms is incorrect."
            )
        self.triple_sh_integrator.generate_integrator_cache_23(
            alm2=self.baseline_cache,
            alm3=beam_alms,
            batch_parallel_mode=self.batch_parallel_mode,
        )
        self.integrator_cache = self.triple_sh_integrator.linear_integrator_cache_23
        return self.integrator_cache

    def mmodes(self, sky_alms, beam_alms=None):
        sky_alms = extend_dimensions_if_one_batch(sky_alms, 3)
        if sky_alms.shape[0] != len(self.frequencies_MHz):
            raise ValueError(
                "The sky_alms coefficients do not correspond to the number of frequency bins."
            )
        if sky_alms.shape[1] != self.sky_lmax + 1:
            raise ValueError(
                "The sky_alms coefficients do not correspond to the inputted sky_lmax."
            )
        if sky_alms.shape[2] != 2 * self.sky_lmax + 1:
            raise ValueError(
                "The sky_alms coefficients do not correspond to the expected harmonic orders "
                "for the inputted sky_lmax (ms = 2 * sky_lmax + 1)."
            )
        cut_sky_alms = sky_alms[
            ..., list(self.sky_m_index_map.keys())
        ]  # Need to select only the valid ms

        if beam_alms is None and self.integrator_cache is None:
            raise ValueError("Must specify either beam_alms or integrator cache.")
        else:
            if beam_alms is None:
                integrator_cache = self.integrator_cache
            else:
                integrator_cache = self.sky_linear_mmode_generator(beam_alms=beam_alms)
            integrator_cache = extend_dimensions_if_one_batch(integrator_cache, 3)
            return self.triple_sh_integrator.batch_gaunt_integrate_cached_23(
                alm1=cut_sky_alms, integrator_cache=integrator_cache
            )

    def visibilities(
        self,
        sky_alms,
        beam_alms=None,
        return_mmodes=False,
    ):
        mmodes = self.mmodes(beam_alms=beam_alms, sky_alms=sky_alms)
        vis = mmodes_to_visibilities(mmodes, self.sky_lmax, ms=self.sky_m_values)
        if return_mmodes:
            return vis, mmodes
        else:
            return vis

    def setup_zarr_store(
        self,
        store_location: str | Path,
        frequency_chunksize: int | None = None,
        group_path: str = r"/",
        store_metadata_attrs: dict | None = None,
    ):
        integrator_cache_shape = (
            self.frequencies_MHz.size,
            2 * self.sky_lmax + 1,
            self.sky_lmax + 1,
        )
        integrator_cache_chunks = (
            (
                frequency_chunksize
                if frequency_chunksize is not None
                else self.frequencies_MHz.size
            ),
            1,  # per m-chunking for parallel writes
            self.power_beam_lmax + 1,
        )
        self._setup_zarr_store(
            store_location=store_location,
            group_path=group_path,
            integrator_cache_shape=integrator_cache_shape,
            integrator_cache_chunks=integrator_cache_chunks,
            store_metadata_attrs=store_metadata_attrs,
        )

TripleSHIntegrator

A class for performing integrals of the products of three functions on the sphere expressed as their spherical harmonic transformations.

This class provides methods for this integration performed with a variety of techniques using Gaunt coefficients and grid-based approaches. It supports caching for efficient repeated computations when changing only one of the functions. Additionally it allows for computing the Fourier transform of these integrals in an azimuthal rotation angle of one of the functions (always the 1st) around the others, ie. the \(m\)-modes.

That is,

\[ \begin{align} \mathcal{V} &= \int\diff\Omega~f_1(\dirvec)f_2(\dirvec)f_3(\dirvec) \\ &= \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_1 m_2 m_3}} a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3} \int\diff\Omega~Y_{\ell_1 m_1}(\dirvec) Y_{\ell_2 m_2}(\dirvec) Y_{\ell_3 m_3}(\dirvec), \end{align} \]

where

\[ f_1(\dirvec) = \sum_{\ell_1=0}^{l_{1,\mathrm{max}}} \sum_{m_1=-\ell_1}^{\ell_1} a_{\ell_1 m_1} Y_{\ell_1 m_1}(\dirvec) \]

etc.

For the \(m\)-modes:

\[ \begin{align} \mathcal{V}_{m_1} &= \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_2 m_3}} a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3} \int\diff\Omega~Y_{\ell_1 m_1}(\dirvec) Y_{\ell_2 m_2}(\dirvec) Y_{\ell_3 m_3}(\dirvec), \\ &= \int \frac{\diff\phi}{2\pi} \exp[-im_1\phi] \int\diff\Omega~f_1( \matr{\mathcal{R}}_Z(-\phi)\dirvec)f_2(\dirvec)f_3(\dirvec) \end{align} \]

where \(\matr{\mathcal{R}}_Z(-\phi)\) is a passive basis rotation about the polar axis of the sphere.

Notes

The index of the functions only matters in differentiating from the others the first function/set of coefficients as the integration can be split up in \(m_1\) values. Additionally integrals can be performed rotating this function azimuthally around the others. That is, for the purposes of this code, this represents the sky.

Parameters:

Name Type Description Default
l1max int

Maximum spherical harmonic degree for the first set of coefficients.

required
l2max int

Maximum spherical harmonic degree for the second set of coefficients.

required
l3max int

Maximum spherical harmonic degree for the third set of coefficients.

required
absm1_limits tuple[int, int | None]

Inclusive lower and exclusive upper limits for the absolute value of the m1 index. Defaults to (0, None), ie, all m1 values.

(0, None)
generate_cache_on_init bool

Whether to generate the Gaunt coefficient cache during initialization, by default False.

False

Attributes:

Name Type Description
m1_values list[int]

List of m1 values that integrations are performed for.

m1_index_map dict[int, int]

Mapping of m1 values to their indices in a completed set of m1 values.

linear_integrator_cache_12 ndarray[tuple[Any, ...], dtype[complex128]] | None
linear_integrator_cache_23 ndarray[tuple[Any, ...], dtype[complex128]] | None
gaunt_cache CachingGauntContractor | CachingGauntContractorOpt12 | None
m1_global_index list[int]

Methods:

Name Description
generate_gaunt_cache

Generates the Gaunt coefficient cache.

clear_gaunt_cache

Clears the Gaunt coefficient cache.

sparse_gaunt_einsum

Performs sparse tensor contraction using computed Gaunt coefficients.

grid_integrate

Performs integration using a grid-based approach.

gaunt_integrate

Performs integration using a direct sum over precomputed Gaunt coefficients.

linear_gaunt_integrator_12

Computes a linear integrator for fixed first and second sets of coefficients.

generate_integrator_cache_12

batch_parallel_mode="channel") Generates and caches the linear integrator for fixed first and second sets of coefficients.

batch_gaunt_integrate_cached_12

Uses a integrator cache for fixed first and second sets of coefficients to compute integrals over a batch of the third set of coefficients.

linear_gaunt_integrator_23

Computes a linear integrator for fixed second and third sets of coefficients.

generate_integrator_cache_23

Generates and caches the linear integrator for fixed second and third sets of coefficients.

batch_gaunt_integrate_cached_12

Uses a integrator cache for fixed second and third sets of coefficients to compute integrals over a bacth of the first set of coefficients.

Source code in src/serval/core.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
@define(eq=False)
class TripleSHIntegrator:
    r"""
    A class for performing integrals of the products of three functions on the
    sphere expressed as their spherical harmonic transformations.

    This class provides methods for this integration performed with a variety
    of techniques using Gaunt coefficients and grid-based approaches. It
    supports caching for efficient repeated computations when changing only
    one of the functions. Additionally it allows for computing the Fourier
    transform of these integrals in an azimuthal rotation angle of one of
    the functions (always the 1st) around the others, ie. the $m$-modes.

    That is,

    $$
    \begin{align}
    \mathcal{V} &= \int\diff\Omega~f_1(\dirvec)f_2(\dirvec)f_3(\dirvec) \\
                &= \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_1 m_2 m_3}}
                  a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3}
                  \int\diff\Omega~Y_{\ell_1 m_1}(\dirvec)
                  Y_{\ell_2 m_2}(\dirvec) Y_{\ell_3 m_3}(\dirvec),
    \end{align}
    $$

    where

    $$
    f_1(\dirvec) = \sum_{\ell_1=0}^{l_{1,\mathrm{max}}}
                   \sum_{m_1=-\ell_1}^{\ell_1} a_{\ell_1 m_1} Y_{\ell_1 m_1}(\dirvec)
    $$

    etc.

    For the $m$-modes:

    $$
    \begin{align}
    \mathcal{V}_{m_1} &= \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_2 m_3}}
                        a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3}
                        \int\diff\Omega~Y_{\ell_1 m_1}(\dirvec)
                        Y_{\ell_2 m_2}(\dirvec) Y_{\ell_3 m_3}(\dirvec), \\
                      &=
                        \int \frac{\diff\phi}{2\pi} \exp[-im_1\phi] \int\diff\Omega~f_1(
                        \matr{\mathcal{R}}_Z(-\phi)\dirvec)f_2(\dirvec)f_3(\dirvec)
    \end{align}
    $$

    where $\matr{\mathcal{R}}_Z(-\phi)$ is a passive basis rotation about the polar axis
    of the sphere.

    Notes
    -----
    The index of the functions only matters in differentiating from the others the first
    function/set of coefficients as the integration can be split up in $m_1$ values.
    Additionally integrals can be performed rotating this function azimuthally around
    the others. That is, for the purposes of this code, this represents the sky.


    Parameters
    ----------
    l1max : int
        Maximum spherical harmonic degree for the first set of coefficients.
    l2max : int
        Maximum spherical harmonic degree for the second set of coefficients.
    l3max : int
        Maximum spherical harmonic degree for the third set of coefficients.
    absm1_limits : tuple[int, int | None], optional
        Inclusive lower and exclusive upper limits for the absolute value of
        the m1 index. Defaults to (0, None), ie, all m1 values.
    generate_cache_on_init : bool, optional
        Whether to generate the Gaunt coefficient cache during initialization,
        by default False.

    Attributes
    ----------
    l1max : int
        Maximum spherical harmonic degree for the first set of coefficients.
    l2max : int
        Maximum spherical harmonic degree for the second set of coefficients.
    l3max : int
        Maximum spherical harmonic degree for the third set of coefficients.
    absm1_limits : tuple[int, int | None]
        Inclusive lower and exclusive upper limits for the absolute value of
        the m1 index.
    generate_cache_on_init : bool
        Whether to generate the Gaunt coefficient cache during initialization.
    m1_values : list[int]
        List of m1 values that integrations are performed for.
    m1_index_map : dict[int, int]
        Mapping of m1 values to their indices in a completed set of m1 values.

    Methods
    -------
    generate_gaunt_cache(cache_type=CachingGauntContractor)
        Generates the Gaunt coefficient cache.
    clear_gaunt_cache()
        Clears the Gaunt coefficient cache.
    sparse_gaunt_einsum(sparse_alm1, sparse_alm2, sparse_alm3, sum_m1=False, gaunts=None)
        Performs sparse tensor contraction using computed Gaunt coefficients.
    grid_integrate(alm1, alm2, alm3, sum_m1=False)
        Performs integration using a grid-based approach.
    gaunt_integrate(alm1, alm2, alm3, sum_m1=False)
        Performs integration using a direct sum over precomputed Gaunt coefficients.
    linear_gaunt_integrator_12(alm1, alm2, contract3=None, sum_m1=False)
        Computes a linear integrator for fixed first and second sets of coefficients.
    generate_integrator_cache_12(alm1, alm2, contract3=None, release_gaunt_cache=True,
                                 batch_parallel_mode="channel")
        Generates and caches the linear integrator for fixed first and second sets of
        coefficients.
    batch_gaunt_integrate_cached_12(alm3, integrator_cache=None, sum_m1=False)
        Uses a integrator cache for fixed first and second sets of coefficients to
        compute integrals over a batch of the third set of coefficients.
    linear_gaunt_integrator_23(alm2, alm3, sum_m1=False)
        Computes a linear integrator for fixed second and third sets of coefficients.
    generate_integrator_cache_23(alm2, alm3)
        Generates and caches the linear integrator for fixed second and third sets of
        coefficients.
    batch_gaunt_integrate_cached_12(alm1, integrator_cache=None, sum_m1=False)
        Uses a integrator cache for fixed second and third sets of coefficients to
        compute integrals over a bacth of the first set of coefficients.
    """

    l1max: int
    l2max: int
    l3max: int

    absm1_limits: tuple[int, int | None] = (0, None)
    generate_cache_on_init: bool = False

    linear_integrator_cache_12: npt.NDArray[np.complex128] | None = field(
        init=False, default=None
    )
    linear_integrator_cache_23: npt.NDArray[np.complex128] | None = field(
        init=False, default=None
    )
    gaunt_cache: CachingGauntContractor | CachingGauntContractorOpt12 | None = field(
        init=False, default=None
    )
    m1_values: list[int] = field(init=False)
    m1_index_map: dict[int, int] = field(init=False)
    m1_global_index: list[int] = field(init=False)

    _cache_types: ClassVar[
        dict[str, CachingGauntContractor | CachingGauntContractorOpt12]
    ] = {
        "opt12": CachingGauntContractorOpt12,
        "generic": CachingGauntContractor,
    }

    def __attrs_post_init__(self):
        if self.absm1_limits[1] is None:
            self.absm1_limits = (self.absm1_limits[0], self.l1max + 1)
        if self.generate_cache_on_init:
            self.generate_gaunt_cache()
        self.m1_values, self.m1_index_map = m1_indexing(
            self.l1max, self.absm1_limits[0], self.absm1_limits[1]
        )
        self.m1_global_index = list(self.m1_index_map.keys())

    def generate_gaunt_cache(
        self, cache_type_tag: Literal["opt12", "generic"] = "generic"
    ):
        """Generates the Gaunt coefficient cache."""
        self.gaunt_cache = self._cache_types[cache_type_tag](
            self.l1max,
            self.l2max,
            self.l3max,
            absm1_lower=self.absm1_limits[0],
            absm1_upper=self.absm1_limits[1],
        )

    def clear_gaunt_cache(self):
        """Clears the Gaunt coefficient cache."""
        self.gaunt_cache = None

    def sparse_gaunt_einsum(
        self,
        sparse_alm1: sparse.SparseArray,
        sparse_alm2: sparse.SparseArray,
        sparse_alm3: sparse.SparseArray,
        sum_m1: bool = False,
        gaunts: sparse.COO | None = None,
    ) -> sparse.COO:
        r"""
        Perform the triple integral using a sparse tensor contraction.

        Compute the triple integral by generating or using a sparse
        representation of the Gaunt coefficients and performing the sum
        with a sparse einsum. This is mostly a cross-checking function
        and is not performant.

        Parameters
        ----------
        sparse_alm1 : sparse.SparseArray
            Sparse represenation of the first set of coefficients with shape (l1max+1, Nm1).
        sparse_alm2 : sparse.SparseArray
            Sparse represenation of the second set of coefficients with shape (l2max+1, 2*l2max+1).
        sparse_alm3 : sparse.SparseArray
            Sparse represenation of the third set of coefficients (l3max+1, 2*l3max+1).
        sum_m1 : bool, optional
            Whether to sum over the m1's before output, by default False.
        gaunts : sparse.COO | None, optional
            Sparse representation of the Gaunt coefficients if already.
            computed, by default None.

        Returns
        -------
        npt.NDArray[np.complex128] | np.complex128
            If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
            sum over all m-modes.
        """
        if gaunts is None:
            gaunts = gaunts_coo(self.l1max, self.l2max, self.l3max)
        if sum_m1:
            return sparse.einsum(
                "ijklmn, ij, kl, mn -> ", gaunts, sparse_alm1, sparse_alm2, sparse_alm3
            )
        else:
            return sparse.einsum(
                    "ijklmn, ij, kl, mn -> j",
                    gaunts,
                    sparse_alm1,
                    sparse_alm2,
                    sparse_alm3,
                ).todense()

    def grid_integrate(
        self,
        alm1: npt.NDArray[np.complex128],
        alm2: npt.NDArray[np.complex128],
        alm3: npt.NDArray[np.complex128],
        sum_m1: bool = False,
    ) -> npt.NDArray[np.complex128]:
        r"""Perform the triple integral using a grid-based approach.

        Here the product of the second and third fields are evaluated on a
        consistent resolution grid, decomposed to conjugate spherical
        harmonics and multipled by the first set of coefficients. This more
        standard m-mode approach is performant for single integrals
        but, as implemented here, can't cache intermediate results and
        therefore not suitable for batching.


        Parameters
        ---------
        alm1 : npt.NDArray[np.complex128]
            The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
        alm2 : npt.NDArray[np.complex128]
            The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
        alm3 : npt.NDArray[np.complex128]
            The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).
        sum_m1 : bool, optional
            Whether to sum over the m1's before output, by default False.

        Returns
        -------
        npt.NDArray[np.complex128] | np.complex128
            If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
            sum over all m-modes.
        """
        grid_lmax = int(np.ceil(self.l1max + self.l2max + self.l3max))
        alm1, alm2, alm3 = broadcast_bandlimits(alm1, alm2, alm3, lmax=grid_lmax)
        static_grid = grid_template(grid_lmax)
        static_grid.data[:] = 1.0
        static_grid *= array_synthesis(alm2) * array_synthesis(alm3)
        static_alm = analysis(static_grid, conjugate=True, return_sparse=False)
        out_mmodes = compute_mmodes(static_alm, alm1)
        return out_mmodes[np.asarray(self.m1_values) + grid_lmax]

    def gaunt_integrate(
        self,
        alm1: npt.NDArray[np.complex128],
        alm2: npt.NDArray[np.complex128],
        alm3: npt.NDArray[np.complex128],
        sum_m1: bool = False,
    ) -> npt.NDArray[np.complex128] | float:
        r"""Perform the triple integral by doing and inplace
        sum-product over gaunt coefficients.

        $$
        \mathcal{V} = \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_1 m_2 m_3}}
                    \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~
                    a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3}.
        $$

        Here the sum is performed inplace with no caching.

        Parameters
        ----------
        alm1 : npt.NDArray[np.complex128]
            The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
        alm2 : npt.NDArray[np.complex128]
            The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
        alm3 : npt.NDArray[np.complex128]
            The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).
        sum_m1 : bool, optional
            Whether to sum over the m1's before output, by default False.

        Returns
        -------
        npt.NDArray[np.complex128] | np.complex128
            If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
            sum over all m-modes.
        """
        return gaunt_dot123(
            alm1=alm1,
            alm2=alm2,
            alm3=alm3,
            sum_m1=sum_m1,
            absm1_lower=self.absm1_limits[0],
            absm1_upper=self.absm1_limits[1],
        )

    def linear_gaunt_integrator_12(
        self,
        alm1: npt.NDArray[np.complex128],
        alm2: npt.NDArray[np.complex128],
        contract3: npt.NDArray[np.complex128] | None = None,
        sum_m1: bool = False,
    ) -> npt.NDArray[np.complex128]:
        r"""Generate a linear operator that, with fixed first and second set of coefficients
        performs the integral when sum-producted with the third set of coefficients.

        $$
        \matr{L}_{m_1}^{\ell_3 m_3} =
                    \sum_{\substack{\ell_1 \ell_2 \\ m_1 m_2}}
                    \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~
                    a_{\ell_1 m_1}a_{\ell_2 m_2}.
        $$

        If requested, additionally contract with a linear operator along the
        m3 axis (e.g. a rotation with a Wigner-d matrix),

        $$
        \matr{L}_{m_1}^{\ell_3 m^\prime_3} = \sum_{m_3} \matr{W}^{\ell_3 m^\prime_3}_{m_3}
                                             \matr{L}_{m_1}^{\ell_3 m_3}
        $$

        Parameters
        ----------
        alm1 : npt.NDArray[np.complex128]
            The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
        alm2 : npt.NDArray[np.complex128]
            The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
        contract3 : npt.NDArray[np.complex128] | None, optional
            An optional linear operator along the m3 axis, shape
            (l3max+1, 2*l3max+1, Nm3prime<=2*l3max+1), by default None
        sum_m1 : bool, optional
            Whether to sum over the m1's before output, by default False.

        Returns
        -------
        npt.NDArray[np.complex128]
            By default (Nm1, Nl3, nm3) linear operator. If sum_m1=True, an (Nl3, Nm3) array
            where the m1 axis has been summed over. If contract3 is present, this will instead
            be an (Nm1, Nl3, Nm3prime) or (Nl3, Nm3prime) array resulting from the subsequent
            contraction on the m3 axis.
        """
        tensor = gaunt_dot12(
            alm1=alm1,
            alm2=alm2,
            l3max=self.l3max,
            sum_m1=sum_m1,
            absm1_lower=self.absm1_limits[0],
            absm1_upper=self.absm1_limits[1],
        )
        if contract3 is not None:
            return integrator12_contract3(tensor, contract3)
        else:
            return tensor

    def generate_integrator_cache_12(
        self,
        alm1: npt.NDArray[np.complex128],
        alm2: npt.NDArray[np.complex128],
        contract3: npt.NDArray[np.complex128] | None = None,
        release_gaunt_cache: bool = True,
        batch_parallel_mode: Literal["channel-opt", "channel", "gaunt"] = "channel-opt",
    ):
        """_summary_

        Parameters
        ----------
        alm1 : npt.NDArray[np.complex128]
            _description_
        alm2 : npt.NDArray[np.complex128]
            _description_
        contract3 : npt.NDArray[np.complex128] | None, optional
            _description_, by default None
        release_gaunt_cache : bool, optional
            _description_, by default True
        batch_parallel_mode : Literal["channel-opt", "channel", "gaunt"], optional
            _description_, by default "channel-opt"
        """
        cache_type_tag: Literal["opt12", "generic"] = (
            "opt12" if batch_parallel_mode == "channel-opt" else "generic"
        )
        if self.gaunt_cache is None:
            self.generate_gaunt_cache(cache_type_tag)
        if not isinstance(self.gaunt_cache, self._cache_types[cache_type_tag]):
            raise TypeError(f"gaunt_cache must be a {self._cache_types[cache_type_tag]} object.")
        assert self.gaunt_cache is not None
        alm1 = extend_dimensions_if_one_batch(alm1, 3)
        alm2 = extend_dimensions_if_one_batch(alm2, 3)
        result = np.zeros(
            (
                alm1.shape[0],
                len(self.m1_values),
                self.l3max + 1,
                2 * self.l3max + 1,
            ),
            dtype=np.complex128,
        )
        if batch_parallel_mode in ["channel", "channel-opt"]:
            self.gaunt_cache.batch_parallel_batch_dot12(alm1, alm2, result)
        else:
            self.gaunt_cache.gaunt_parallel_batch_dot12(alm1, alm2, result)
        if release_gaunt_cache:
            self.clear_gaunt_cache()
        if contract3 is not None:
            result = integrator12_contract3(result, contract3)
        if result.shape[0] == 1:
            self.linear_integrator_cache_12 = result[0, ...]
        else:
            self.linear_integrator_cache_12 = result

    def batch_gaunt_integrate_cached_12(
        self,
        alm3: npt.NDArray[np.complex128],
        integrator_cache: npt.NDArray[np.complex128] | None = None,
        sum_m1: bool = False,
    ):
        if integrator_cache is None:
            if self.linear_integrator_cache_12 is None:
                raise ValueError("Integrator cache must be provided or pre-computed")
            else:
                tensor = self.linear_integrator_cache_12
        else:
            tensor = integrator_cache
        result = integrator12_dot3(tensor, alm3)
        if sum_m1:
            return np.sum(result, axis=0)
        else:
            return result

    def linear_gaunt_integrator_23(
        self,
        alm2: npt.NDArray[np.complex128],
        alm3: npt.NDArray[np.complex128],
        sum_m1: bool = False,
    ) -> npt.NDArray[np.complex128]:
        return gaunt_dot23(
            alm2=alm2,
            alm3=alm3,
            l1max=self.l1max,
            sum_m1=sum_m1,
            absm1_lower=self.absm1_limits[0],
            absm1_upper=self.absm1_limits[1],
        )

    def generate_integrator_cache_23(
        self,
        alm2: npt.NDArray[np.complex128],
        alm3: npt.NDArray[np.complex128],
        release_gaunt_cache: bool = True,
        batch_parallel_mode: Literal["channel", "gaunt"] = "channel",
    ):
        if self.gaunt_cache is None:
            self.generate_gaunt_cache()
        if not isinstance(self.gaunt_cache, CachingGauntContractor):
            raise TypeError("gaunt_cache must be a CachingGauntContractor object.")
        alm2 = extend_dimensions_if_one_batch(alm2, 3)
        alm3 = extend_dimensions_if_one_batch(alm3, 3)
        result = np.zeros(
            (alm2.shape[0], len(self.m1_values), self.l1max + 1), dtype=np.complex128
        )
        if batch_parallel_mode == "channel":
            self.gaunt_cache.batch_parallel_batch_dot23(alm2, alm3, result)
        else:
            self.gaunt_cache.gaunt_parallel_batch_dot23(alm2, alm3, result)
        if release_gaunt_cache:
            self.clear_gaunt_cache()
        if result.shape[0] == 1:
            self.linear_integrator_cache_23 = result[0, ...]
        else:
            self.linear_integrator_cache_23 = result

    def batch_gaunt_integrate_cached_23(
        self,
        alm1: npt.NDArray[np.complex128],
        integrator_cache: npt.NDArray[np.complex128] | None = None,
        sum_m1: bool = False,
    ):
        if integrator_cache is None:
            if self.linear_integrator_cache_23 is None:
                raise ValueError("Integrator cache must be provided or pre-computed")
            else:
                tensor = self.linear_integrator_cache_23
        else:
            tensor = integrator_cache
        result = integrator23_dot1(tensor, alm1)
        if sum_m1:
            return np.sum(result, axis=0)
        else:
            return result

clear_gaunt_cache()

Clears the Gaunt coefficient cache.

Source code in src/serval/core.py
def clear_gaunt_cache(self):
    """Clears the Gaunt coefficient cache."""
    self.gaunt_cache = None

gaunt_integrate(alm1, alm2, alm3, sum_m1=False)

Perform the triple integral by doing and inplace sum-product over gaunt coefficients.

\[ \mathcal{V} = \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_1 m_2 m_3}} \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~ a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3}. \]

Here the sum is performed inplace with no caching.

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).

required
alm2 NDArray[complex128]

The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).

required
alm3 NDArray[complex128]

The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).

required
sum_m1 bool

Whether to sum over the m1's before output, by default False.

False

Returns:

Type Description
NDArray[complex128] | complex128

If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar sum over all m-modes.

Source code in src/serval/core.py
def gaunt_integrate(
    self,
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    alm3: npt.NDArray[np.complex128],
    sum_m1: bool = False,
) -> npt.NDArray[np.complex128] | float:
    r"""Perform the triple integral by doing and inplace
    sum-product over gaunt coefficients.

    $$
    \mathcal{V} = \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_1 m_2 m_3}}
                \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~
                a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3}.
    $$

    Here the sum is performed inplace with no caching.

    Parameters
    ----------
    alm1 : npt.NDArray[np.complex128]
        The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
    alm2 : npt.NDArray[np.complex128]
        The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
    alm3 : npt.NDArray[np.complex128]
        The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).
    sum_m1 : bool, optional
        Whether to sum over the m1's before output, by default False.

    Returns
    -------
    npt.NDArray[np.complex128] | np.complex128
        If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
        sum over all m-modes.
    """
    return gaunt_dot123(
        alm1=alm1,
        alm2=alm2,
        alm3=alm3,
        sum_m1=sum_m1,
        absm1_lower=self.absm1_limits[0],
        absm1_upper=self.absm1_limits[1],
    )

generate_gaunt_cache(cache_type_tag='generic')

Generates the Gaunt coefficient cache.

Source code in src/serval/core.py
def generate_gaunt_cache(
    self, cache_type_tag: Literal["opt12", "generic"] = "generic"
):
    """Generates the Gaunt coefficient cache."""
    self.gaunt_cache = self._cache_types[cache_type_tag](
        self.l1max,
        self.l2max,
        self.l3max,
        absm1_lower=self.absm1_limits[0],
        absm1_upper=self.absm1_limits[1],
    )

generate_integrator_cache_12(alm1, alm2, contract3=None, release_gaunt_cache=True, batch_parallel_mode='channel-opt')

summary

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

description

required
alm2 NDArray[complex128]

description

required
contract3 NDArray[complex128] | None

description, by default None

None
release_gaunt_cache bool

description, by default True

True
batch_parallel_mode Literal['channel-opt', 'channel', 'gaunt']

description, by default "channel-opt"

'channel-opt'
Source code in src/serval/core.py
def generate_integrator_cache_12(
    self,
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    contract3: npt.NDArray[np.complex128] | None = None,
    release_gaunt_cache: bool = True,
    batch_parallel_mode: Literal["channel-opt", "channel", "gaunt"] = "channel-opt",
):
    """_summary_

    Parameters
    ----------
    alm1 : npt.NDArray[np.complex128]
        _description_
    alm2 : npt.NDArray[np.complex128]
        _description_
    contract3 : npt.NDArray[np.complex128] | None, optional
        _description_, by default None
    release_gaunt_cache : bool, optional
        _description_, by default True
    batch_parallel_mode : Literal["channel-opt", "channel", "gaunt"], optional
        _description_, by default "channel-opt"
    """
    cache_type_tag: Literal["opt12", "generic"] = (
        "opt12" if batch_parallel_mode == "channel-opt" else "generic"
    )
    if self.gaunt_cache is None:
        self.generate_gaunt_cache(cache_type_tag)
    if not isinstance(self.gaunt_cache, self._cache_types[cache_type_tag]):
        raise TypeError(f"gaunt_cache must be a {self._cache_types[cache_type_tag]} object.")
    assert self.gaunt_cache is not None
    alm1 = extend_dimensions_if_one_batch(alm1, 3)
    alm2 = extend_dimensions_if_one_batch(alm2, 3)
    result = np.zeros(
        (
            alm1.shape[0],
            len(self.m1_values),
            self.l3max + 1,
            2 * self.l3max + 1,
        ),
        dtype=np.complex128,
    )
    if batch_parallel_mode in ["channel", "channel-opt"]:
        self.gaunt_cache.batch_parallel_batch_dot12(alm1, alm2, result)
    else:
        self.gaunt_cache.gaunt_parallel_batch_dot12(alm1, alm2, result)
    if release_gaunt_cache:
        self.clear_gaunt_cache()
    if contract3 is not None:
        result = integrator12_contract3(result, contract3)
    if result.shape[0] == 1:
        self.linear_integrator_cache_12 = result[0, ...]
    else:
        self.linear_integrator_cache_12 = result

grid_integrate(alm1, alm2, alm3, sum_m1=False)

Perform the triple integral using a grid-based approach.

Here the product of the second and third fields are evaluated on a consistent resolution grid, decomposed to conjugate spherical harmonics and multipled by the first set of coefficients. This more standard m-mode approach is performant for single integrals but, as implemented here, can't cache intermediate results and therefore not suitable for batching.

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).

required
alm2 NDArray[complex128]

The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).

required
alm3 NDArray[complex128]

The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).

required
sum_m1 bool

Whether to sum over the m1's before output, by default False.

False

Returns:

Type Description
NDArray[complex128] | complex128

If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar sum over all m-modes.

Source code in src/serval/core.py
def grid_integrate(
    self,
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    alm3: npt.NDArray[np.complex128],
    sum_m1: bool = False,
) -> npt.NDArray[np.complex128]:
    r"""Perform the triple integral using a grid-based approach.

    Here the product of the second and third fields are evaluated on a
    consistent resolution grid, decomposed to conjugate spherical
    harmonics and multipled by the first set of coefficients. This more
    standard m-mode approach is performant for single integrals
    but, as implemented here, can't cache intermediate results and
    therefore not suitable for batching.


    Parameters
    ---------
    alm1 : npt.NDArray[np.complex128]
        The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
    alm2 : npt.NDArray[np.complex128]
        The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
    alm3 : npt.NDArray[np.complex128]
        The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).
    sum_m1 : bool, optional
        Whether to sum over the m1's before output, by default False.

    Returns
    -------
    npt.NDArray[np.complex128] | np.complex128
        If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
        sum over all m-modes.
    """
    grid_lmax = int(np.ceil(self.l1max + self.l2max + self.l3max))
    alm1, alm2, alm3 = broadcast_bandlimits(alm1, alm2, alm3, lmax=grid_lmax)
    static_grid = grid_template(grid_lmax)
    static_grid.data[:] = 1.0
    static_grid *= array_synthesis(alm2) * array_synthesis(alm3)
    static_alm = analysis(static_grid, conjugate=True, return_sparse=False)
    out_mmodes = compute_mmodes(static_alm, alm1)
    return out_mmodes[np.asarray(self.m1_values) + grid_lmax]

linear_gaunt_integrator_12(alm1, alm2, contract3=None, sum_m1=False)

Generate a linear operator that, with fixed first and second set of coefficients performs the integral when sum-producted with the third set of coefficients.

\[ \matr{L}_{m_1}^{\ell_3 m_3} = \sum_{\substack{\ell_1 \ell_2 \\ m_1 m_2}} \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~ a_{\ell_1 m_1}a_{\ell_2 m_2}. \]

If requested, additionally contract with a linear operator along the m3 axis (e.g. a rotation with a Wigner-d matrix),

\[ \matr{L}_{m_1}^{\ell_3 m^\prime_3} = \sum_{m_3} \matr{W}^{\ell_3 m^\prime_3}_{m_3} \matr{L}_{m_1}^{\ell_3 m_3} \]

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).

required
alm2 NDArray[complex128]

The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).

required
contract3 NDArray[complex128] | None

An optional linear operator along the m3 axis, shape (l3max+1, 2l3max+1, Nm3prime<=2l3max+1), by default None

None
sum_m1 bool

Whether to sum over the m1's before output, by default False.

False

Returns:

Type Description
NDArray[complex128]

By default (Nm1, Nl3, nm3) linear operator. If sum_m1=True, an (Nl3, Nm3) array where the m1 axis has been summed over. If contract3 is present, this will instead be an (Nm1, Nl3, Nm3prime) or (Nl3, Nm3prime) array resulting from the subsequent contraction on the m3 axis.

Source code in src/serval/core.py
def linear_gaunt_integrator_12(
    self,
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    contract3: npt.NDArray[np.complex128] | None = None,
    sum_m1: bool = False,
) -> npt.NDArray[np.complex128]:
    r"""Generate a linear operator that, with fixed first and second set of coefficients
    performs the integral when sum-producted with the third set of coefficients.

    $$
    \matr{L}_{m_1}^{\ell_3 m_3} =
                \sum_{\substack{\ell_1 \ell_2 \\ m_1 m_2}}
                \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~
                a_{\ell_1 m_1}a_{\ell_2 m_2}.
    $$

    If requested, additionally contract with a linear operator along the
    m3 axis (e.g. a rotation with a Wigner-d matrix),

    $$
    \matr{L}_{m_1}^{\ell_3 m^\prime_3} = \sum_{m_3} \matr{W}^{\ell_3 m^\prime_3}_{m_3}
                                         \matr{L}_{m_1}^{\ell_3 m_3}
    $$

    Parameters
    ----------
    alm1 : npt.NDArray[np.complex128]
        The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
    alm2 : npt.NDArray[np.complex128]
        The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
    contract3 : npt.NDArray[np.complex128] | None, optional
        An optional linear operator along the m3 axis, shape
        (l3max+1, 2*l3max+1, Nm3prime<=2*l3max+1), by default None
    sum_m1 : bool, optional
        Whether to sum over the m1's before output, by default False.

    Returns
    -------
    npt.NDArray[np.complex128]
        By default (Nm1, Nl3, nm3) linear operator. If sum_m1=True, an (Nl3, Nm3) array
        where the m1 axis has been summed over. If contract3 is present, this will instead
        be an (Nm1, Nl3, Nm3prime) or (Nl3, Nm3prime) array resulting from the subsequent
        contraction on the m3 axis.
    """
    tensor = gaunt_dot12(
        alm1=alm1,
        alm2=alm2,
        l3max=self.l3max,
        sum_m1=sum_m1,
        absm1_lower=self.absm1_limits[0],
        absm1_upper=self.absm1_limits[1],
    )
    if contract3 is not None:
        return integrator12_contract3(tensor, contract3)
    else:
        return tensor

sparse_gaunt_einsum(sparse_alm1, sparse_alm2, sparse_alm3, sum_m1=False, gaunts=None)

Perform the triple integral using a sparse tensor contraction.

Compute the triple integral by generating or using a sparse representation of the Gaunt coefficients and performing the sum with a sparse einsum. This is mostly a cross-checking function and is not performant.

Parameters:

Name Type Description Default
sparse_alm1 SparseArray

Sparse represenation of the first set of coefficients with shape (l1max+1, Nm1).

required
sparse_alm2 SparseArray

Sparse represenation of the second set of coefficients with shape (l2max+1, 2*l2max+1).

required
sparse_alm3 SparseArray

Sparse represenation of the third set of coefficients (l3max+1, 2*l3max+1).

required
sum_m1 bool

Whether to sum over the m1's before output, by default False.

False
gaunts COO | None

Sparse representation of the Gaunt coefficients if already. computed, by default None.

None

Returns:

Type Description
NDArray[complex128] | complex128

If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar sum over all m-modes.

Source code in src/serval/core.py
def sparse_gaunt_einsum(
    self,
    sparse_alm1: sparse.SparseArray,
    sparse_alm2: sparse.SparseArray,
    sparse_alm3: sparse.SparseArray,
    sum_m1: bool = False,
    gaunts: sparse.COO | None = None,
) -> sparse.COO:
    r"""
    Perform the triple integral using a sparse tensor contraction.

    Compute the triple integral by generating or using a sparse
    representation of the Gaunt coefficients and performing the sum
    with a sparse einsum. This is mostly a cross-checking function
    and is not performant.

    Parameters
    ----------
    sparse_alm1 : sparse.SparseArray
        Sparse represenation of the first set of coefficients with shape (l1max+1, Nm1).
    sparse_alm2 : sparse.SparseArray
        Sparse represenation of the second set of coefficients with shape (l2max+1, 2*l2max+1).
    sparse_alm3 : sparse.SparseArray
        Sparse represenation of the third set of coefficients (l3max+1, 2*l3max+1).
    sum_m1 : bool, optional
        Whether to sum over the m1's before output, by default False.
    gaunts : sparse.COO | None, optional
        Sparse representation of the Gaunt coefficients if already.
        computed, by default None.

    Returns
    -------
    npt.NDArray[np.complex128] | np.complex128
        If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
        sum over all m-modes.
    """
    if gaunts is None:
        gaunts = gaunts_coo(self.l1max, self.l2max, self.l3max)
    if sum_m1:
        return sparse.einsum(
            "ijklmn, ij, kl, mn -> ", gaunts, sparse_alm1, sparse_alm2, sparse_alm3
        )
    else:
        return sparse.einsum(
                "ijklmn, ij, kl, mn -> j",
                gaunts,
                sparse_alm1,
                sparse_alm2,
                sparse_alm3,
            ).todense()

VisProjector

Parameters:

Name Type Description Default
latitude float
required
longitude float
required
baseline_enu tuple[float, float, float]
required
frequencies_MHz ndarray
required
power_beam_lmax int | None
None
baseline_lmax int | None
None
sky_lmax int | None
None
sky_absm_limits list[Optional[int]]
[0, None]
generate_gaunt_cache_on_init bool
False
generate_baseline_cache_on_init bool
False
batch_parallel_mode Literal['channel-opt', 'channel', 'gaunt']
'channel'

Attributes:

Name Type Description
baseline_mmax int | None
baseline_cache ndarray | None
integrator_cache ndarray | None
triple_sh_integrator TripleSHIntegrator
Source code in src/serval/core.py
@define(eq=False)
class VisProjector:
    latitude: float
    longitude: float
    baseline_enu: tuple[float, float, float]
    frequencies_MHz: np.ndarray

    power_beam_lmax: int | None = None
    baseline_lmax: int | None = None
    baseline_mmax: int | None = field(init=False, default=None)
    sky_lmax: int | None = None

    sky_absm_limits: list[int | None] = field(factory=lambda: [0, None])

    generate_gaunt_cache_on_init: bool = False
    generate_baseline_cache_on_init: bool = False
    batch_parallel_mode: Literal["channel-opt", "channel", "gaunt"] = "channel"

    baseline_cache: np.ndarray | None = field(init=False, default=None)
    integrator_cache: np.ndarray | None = field(init=False, default=None)

    triple_sh_integrator: TripleSHIntegrator = field(init=False)

    _last_store_location: Path | None = field(init=False, default=None)
    _last_group_path: str | None = field(init=False, default=None)

    _to_attrs: ClassVar[list[str]] = []
    _to_zarr_store: ClassVar[list[str]] = []
    _opt_tag: ClassVar[str] = ""

    def update_integrator(self):
        self.triple_sh_integrator = TripleSHIntegrator(
            l1max=self.sky_lmax,
            l2max=self.baseline_lmax,
            l3max=self.power_beam_lmax,
            absm1_limits=self.sky_absm_limits,
            generate_cache_on_init=False,
        )

    def generate_gaunt_cache(self):
        if self.baseline_cache is None:
            self.generate_baseline_cache()
        self.update_integrator()
        if self.batch_parallel_mode == "channel-opt":
            self.triple_sh_integrator.generate_gaunt_cache(self._opt_tag)
        else:
            self.triple_sh_integrator.generate_gaunt_cache("generic")

    @property
    def sky_m_values(self):
        return self.triple_sh_integrator.m1_values

    @property
    def sky_m_index_map(self):
        return self.triple_sh_integrator.m1_index_map

    @property
    def sky_m_global_index(self):
        return self.triple_sh_integrator.m1_global_index

    @property
    def sky_m_index_slices(self):
        m_inds = self.sky_m_global_index
        nm_inds = m_inds[: len(m_inds) // 2]
        if len(nm_inds) != 0:  # If ms == [0,]
            pm_inds = m_inds[len(m_inds) // 2 :]
            return (
                [slice(0, self.num_sky_ms // 2), slice(self.num_sky_ms // 2, None)],
                [
                    slice(nm_inds[0], nm_inds[-1] + 1),
                    slice(pm_inds[0], pm_inds[-1] + 1),
                ],
            )
        else:
            return [
                slice(0, 1),
            ], [
                slice(m_inds[0], m_inds[0] + 1),
            ]

    @property
    def num_sky_ms(self):
        return len(self.sky_m_values)

    def _setup_zarr_store(
        self,
        store_location: str | Path,
        integrator_cache_shape: tuple[int, ...],
        integrator_cache_chunks: tuple[int, ...],
        group_path: str = r"/",
        store_metadata_attrs: dict | None = None,
    ):
        to_attrs_dict = {slot: getattr(self, slot) for slot in self._to_attrs}
        to_zarr_store_dict = {slot: getattr(self, slot) for slot in self._to_zarr_store}

        destination_dir = Path(store_location)

        self._last_store_location = destination_dir
        self._last_group_path = group_path

        store = zarr.storage.LocalStore(destination_dir)
        grp = zarr.create_group(store=store, path=group_path, overwrite=True)

        assert self.sky_lmax is not None

        if store_metadata_attrs is not None:
            if group_path != r"/":  # coulf check this more thoroughly
                root = zarr.group(store=store, path="/")
                root.attrs.update(store_metadata_attrs)
            else:
                grp.attrs.update(store_metadata_attrs)

        for k, v in to_attrs_dict.items():
            grp.attrs[k] = v

        for k, v in to_zarr_store_dict.items():
            arr = grp.create_array(
                name=k,
                shape=v.shape,
                dtype=v.dtype,
                overwrite=True,
            )
            arr[:] = v

        ms_completed = grp.create_array(
            name="ms_completed",
            chunks=1,
            shape=2 * self.sky_lmax + 1,
            dtype=bool,
            overwrite=True,
        )
        ms_completed[:] = False

        grp.create_array(
            name="integrator_cache",
            shape=integrator_cache_shape,
            chunks=integrator_cache_chunks,
            dtype=np.complex128,
            overwrite=True,
            compressors=zarr.codecs.BloscCodec(
                cname="blosclz", clevel=9, shuffle=zarr.codecs.BloscShuffle.bitshuffle
            ),
        )

    def write_integrator_cache_to_store(
        self, store_location: str | Path | None = None, group_path: str | None = None
    ):
        if self.integrator_cache is None:
            raise ValueError("Cannot store integrator_cache because it is None.")
        if store_location is None and self._last_store_location is not None:
            destination_dir = self._last_store_location
        elif store_location is not None:
            destination_dir = Path(store_location)
        else:
            raise ValueError("Store location must be specified.")
        if group_path is None and self._last_group_path is not None:
            group_path = self._last_group_path
        elif group_path is None:
            group_path = r"/"  # Default to root group
        store = zarr.storage.LocalStore(destination_dir)
        grp = zarr.group(store=store, path=group_path)
        integrator_cache = grp["integrator_cache"]
        ms_completed = grp["ms_completed"]

        local_m_slices, global_m_slices = self.sky_m_index_slices
        for lslc, gslc in zip(local_m_slices, global_m_slices):  # noqa: B905
            integrator_cache[:, gslc] = self.integrator_cache[:, lslc]
            ms_completed[gslc] = True

    @classmethod
    def from_zarr_store(
        cls,
        store_location: str | Path,
        group_path: str = r"/",
        freq_slice: slice | None = None,
        sky_absm_limits=(0, None),
        load_cache=True,
    ):
        if freq_slice is None:
            freq_slice = slice(None, None, None)
        destination_dir = Path(store_location)
        store = zarr.storage.LocalStore(destination_dir)
        grp = zarr.group(store=store, path=group_path)
        init_dict = {k: grp.attrs[k] for k in cls._to_attrs}
        init_dict.update({k: grp[k][freq_slice] for k in cls._to_zarr_store})
        proj = cls(sky_absm_limits=sky_absm_limits, **init_dict)
        if load_cache:
            local_m_slices, global_m_slices = proj.sky_m_index_slices
            cache_shape = (
                proj.frequencies_MHz.size,
                len(proj.sky_m_values),
            ) + grp[
                "integrator_cache"
            ].shape[2:]
            proj.integrator_cache = np.zeros(cache_shape, dtype=np.complex128)
            for lslc, gslc in zip(local_m_slices, global_m_slices):
                # Load from global slice into local slice
                grp["integrator_cache"].get_basic_selection(
                    (freq_slice, gslc, ...),
                    out=NDBuffer(proj.integrator_cache[:, lslc, ...]),
                )
                if not grp["ms_completed"][gslc].all():
                    warnings.warn(
                        "Zarr Store indicates not all requested sky ms computed.",
                        stacklevel=2,
                    )  # points to caller
        return proj

extend_dimensions_if_one_batch(arr, dim)

If a single frequency is calculated then the alms and cache matrices will be missing the first dimension that corresponds to the number of frequencies. This function takes the arrays and adds a (1, ...) dimension

Parameters:

Name Type Description Default
arr NDArray[complex128]

The input array.

required
dim int

The desired number of dimensions.

required

Returns:

Type Description
NDArray[complex128]

The array with the correct number of dimensions.

Source code in src/serval/core.py
def extend_dimensions_if_one_batch(
    arr: npt.NDArray[np.complex128], dim: int
) -> npt.NDArray[np.complex128]:
    """If a single frequency is calculated then the alms and cache matrices will be missing
    the first dimension that corresponds to the number of frequencies. This function takes the
    arrays and adds a (1, ...) dimension
    Parameters
    ----------
    arr : npt.NDArray[np.complex128]
        The input array.
    dim : int
        The desired number of dimensions.
    Returns
    -------
    npt.NDArray[np.complex128]
        The array with the correct number of dimensions.
    """
    if arr.ndim < dim:
        arr = arr[None, ...]
    return arr

Rotation Utilities (serval.rotate.py)

enu_to_tirs(latitude, longitude)

Creates a rotation object for basis rotations from the ENU coordinate frame of an observer at the specified latitude and longitude to the TIRS frame.

Parameters:

Name Type Description Default
latitude float

The latitude in radians.

required
longitude float

The longitude in radians.

required

Returns:

Type Description
Rotation

A Rotation object representing the ENU to TIRS basis rotation.

Source code in src/serval/rotate.py
def enu_to_tirs(latitude: float, longitude: float) -> Rotation:
    r"""Creates a rotation object for basis rotations from the ENU coordinate frame of an observer
    at the specified latitude and longitude to the TIRS frame.
    Parameters
    ----------
    latitude : float
        The latitude in radians.
    longitude : float
        The longitude in radians.
    Returns
    -------
    Rotation
        A Rotation object representing the ENU to TIRS basis rotation.
    """
    zenith_to_tirs = tirs_to_zenith(latitude=latitude, longitude=longitude).inv()
    return zenith_to_tirs * enu_to_zenith()

enu_to_zenith()

Provides a rotation object converting from the ENU basis to the local Zenith basis.

Returns:

Type Description
Rotation

A Rotation object.

Source code in src/serval/rotate.py
def enu_to_zenith() -> Rotation:
    r"""Provides a rotation object converting from the ENU basis
    to the local Zenith basis.
    Returns
    -------
    Rotation
        A Rotation object.
    """
    # fmt: off
    return Rotation.from_matrix([
        [0, -1, 0],
        [1,  0, 0],
        [0,  0, 1]]
    )

eulers_from_rotation(rotation)

Extracts the ZYZ Euler angles from a rotation object.

The convention used here and throughout is that of ZYZ passive (basis) rotations.

Parameters:

Name Type Description Default
rotation Rotation

A Rotation object.

required

Returns:

Type Description
tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the rotation.

Source code in src/serval/rotate.py
def eulers_from_rotation(rotation: Rotation) -> EulersType:
    r"""Extracts the ZYZ Euler angles from a rotation object.

    The convention used here and throughout is that of
    ZYZ passive (basis) rotations.

    Parameters
    ----------
    rotation : Rotation
        A Rotation object.
    Returns
    -------
    tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the rotation.
    """
    # .inv() for passive convention
    return tuple(rotation.inv().as_euler("ZYZ"))

generate_telescope_wigner_ds(beam_lmax, beam_mmax, baseline_lmax, latitude, longitude, altitude, azimuth, baseline_enu)

Generates Wigner-D matrices for the TIRS to telescope pointing frame and T IRS to baseline frame basis rotations.

Parameters:

Name Type Description Default
beam_lmax int

The maximum degree of the beam Wigner-D matrix.

required
beam_mmax int

The maximum order of m' for the beam Wigner-D matrix.

required
baseline_lmax int

The maximum degree of the baseline Wigner-D matrix.

required
latitude float

The latitude in radians.

required
longitude float

The longitude in radians.

required
altitude float

The altitude angle in radians.

required
azimuth float

The azimuth angle in radians.

required
baseline_enu tuple[float, float, float]

The baseline vector components in the ENU frame (east, north, up).

required

Returns:

Type Description
tuple[NDArray[complex128], NDArray[complex128]]

The Wigner-D matrices for the TIRS to telescope pointing frame and TIRS to baseline frame basis rotations.

Source code in src/serval/rotate.py
def generate_telescope_wigner_ds(
    beam_lmax: int,
    beam_mmax: int,
    baseline_lmax: int,
    latitude: float,
    longitude: float,
    altitude: float,
    azimuth: float,
    baseline_enu: tuple[float, float, float],
) -> tuple[npt.NDArray[np.complex128], npt.NDArray[np.complex128]]:
    r"""Generates Wigner-D matrices for the TIRS to telescope pointing frame and T
    IRS to baseline frame basis rotations.
    Parameters
    ----------
    beam_lmax : int
        The maximum degree of the beam Wigner-D matrix.
    beam_mmax : int
        The maximum order of m' for the beam Wigner-D matrix.
    baseline_lmax : int
        The maximum degree of the baseline Wigner-D matrix.
    latitude : float
        The latitude in radians.
    longitude : float
        The longitude in radians.
    altitude : float
        The altitude angle in radians.
    azimuth : float
        The azimuth angle in radians.
    baseline_enu : tuple[float, float, float]
        The baseline vector components in the ENU frame (east, north, up).
    Returns
    -------
    tuple[npt.NDArray[np.complex128], npt.NDArray[np.complex128]]
        The Wigner-D matrices for the TIRS to telescope pointing frame and TIRS to baseline
        frame basis rotations.
    """
    print("Generating Pointing Wigner-D...")
    wigner_d_pointing = wigner_d(
        beam_lmax,
        beam_mmax,
        tirs_to_pointing(
            latitude=latitude, longitude=longitude, altitude=altitude, azimuth=azimuth
        ),
    )
    print("Generating Baseline Wigner-D...")
    wigner_d_baseline = wigner_d(
        baseline_lmax,
        0,
        tirs_to_baseline_direc(
            latitude=latitude,
            longitude=longitude,
            east=baseline_enu[0],
            north=baseline_enu[1],
            up=baseline_enu[2],
        ),
    )
    return wigner_d_pointing, wigner_d_baseline

inv_eulers(eulers)

Provides the Euler angles corresponding to the inverse rotation of those given.

Parameters:

Name Type Description Default
eulers tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the rotation.

required

Returns:

Type Description
tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the inverse rotation.

Source code in src/serval/rotate.py
def inv_eulers(eulers: EulersType) -> EulersType:
    r"""Provides the Euler angles corresponding to the inverse rotation of those given.
    Parameters
    ----------
    eulers : tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the rotation.
    Returns
    -------
    tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the inverse rotation.
    """
    return (-eulers[2], -eulers[1], -eulers[0])

rotation_from_eulers(eulers, degrees=False)

Creates a rotation object from a set of Euler angles.

The convention used here and throughout is that of ZYZ passive (basis) rotations.

Parameters:

Name Type Description Default
eulers tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the rotation.

required
degrees bool

Whether the input angles are in degrees, by default False.

False

Returns:

Type Description
Rotation

The Rotation object.

Source code in src/serval/rotate.py
def rotation_from_eulers(
    eulers: EulersType,
    degrees: None | bool = False,
) -> Rotation:
    r"""Creates a rotation object from a set of Euler angles.

    The convention used here and throughout is that of
    ZYZ passive (basis) rotations.

    Parameters
    ----------
    eulers : tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the rotation.
    degrees : bool, optional
        Whether the input angles are in degrees, by default False.
    Returns
    -------
    Rotation
        The Rotation object.
    """
    # .inv() for passive convention
    return Rotation.from_euler("ZYZ", eulers, degrees=degrees).inv()

tirs_to_baseline_direc(latitude, longitude, east, north, up)

Creates a rotation object for basis rotations from the TIRS frame to a frame aligned with the direction of the baseline vector.

Parameters:

Name Type Description Default
latitude float

The latitude in radians.

required
longitude float

The longitude in radians.

required
east float

The East component of the baseline vector.

required
north float

The North component of the baseline vector.

required
up float

The Up component of the baseline vector.

required

Returns:

Type Description
Rotation

A Rotation object representing the TIRS to baseline direction frame basis rotation.

Source code in src/serval/rotate.py
def tirs_to_baseline_direc(
    latitude: float, longitude: float, east: float, north: float, up: float
) -> Rotation:
    r"""Creates a rotation object for basis rotations from the TIRS frame to a
    frame aligned with the direction of the baseline vector.
    Parameters
    ----------
    latitude : float
        The latitude in radians.
    longitude : float
        The longitude in radians.
    east : float
        The East component of the baseline vector.
    north : float
        The North component of the baseline vector.
    up : float
        The Up component of the baseline vector.
    Returns
    -------
    Rotation
        A Rotation object representing the TIRS to baseline direction frame basis rotation.
    """
    return zenith_to_baseline_direc(east=east, north=north, up=up) * tirs_to_zenith(
        latitude=latitude, longitude=longitude
    )

tirs_to_baseline_direc_eulers(latitude, longitude, east, north, up)

Returns the Euler angles for basis rotations from the TIRS frame to a frame aligned with the direction of the baseline vector.

Parameters:

Name Type Description Default
latitude float

The latitude in radians.

required
longitude float

The longitude in radians.

required
east float

The East component of the baseline vector.

required
north float

The North component of the baseline vector.

required
up float

The Up component of the baseline vector.

required

Returns:

Type Description
tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the TIRS to baseline frame basis rotation.

Source code in src/serval/rotate.py
def tirs_to_baseline_direc_eulers(
    latitude: float, longitude: float, east: float, north: float, up: float
) -> EulersType:
    r"""Returns the Euler angles for basis rotations from the TIRS frame to a
    frame aligned with the direction of the baseline vector.

    Parameters
    ----------
    latitude : float
        The latitude in radians.
    longitude : float
        The longitude in radians.
    east : float
        The East component of the baseline vector.
    north : float
        The North component of the baseline vector.
    up : float
        The Up component of the baseline vector.
    Returns
    -------
    tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the TIRS to baseline frame
         basis rotation.
    """
    return eulers_from_rotation(
        tirs_to_baseline_direc(
            latitude=latitude, longitude=longitude, east=east, north=north, up=up
        )
    )

tirs_to_cirs(era)

Creates a rotation object corresponding to the basis rotation from the TIRS frame to the CIRS frame.

Parameters:

Name Type Description Default
era float

The Earth Rotation Angle in radians.

required

Returns:

Type Description
Rotation

A Rotation object representing the TIRS to CIRS basis rotation.

Source code in src/serval/rotate.py
def tirs_to_cirs(era: float) -> Rotation:
    r"""Creates a rotation object corresponding to the basis rotation
    from the TIRS frame to the CIRS frame.
    Parameters
    ----------
    era : float
        The Earth Rotation Angle in radians.
    Returns
    -------
    Rotation
        A Rotation object representing the TIRS to CIRS basis rotation.
    """
    return rotation_from_eulers(tirs_to_cirs_eulers(era))

tirs_to_cirs_eulers(era)

Provides the Euler angles corresponding to the basis rotation from the TIRS frame to the CIRS frame.

The convention used here and throughout is that of ZYZ passive (basis) rotations.

Parameters:

Name Type Description Default
era float

The Earth Rotation Angle in radians.

required

Returns:

Type Description
tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the TIRS to CIRS basis rotation.

Source code in src/serval/rotate.py
def tirs_to_cirs_eulers(era: float) -> EulersType:
    r"""Provides the Euler angles corresponding to the basis rotation
    from the TIRS frame to the CIRS frame.

    The convention used here and throughout is that of
    ZYZ passive (basis) rotations.

    Parameters
    ----------
    era : float
        The Earth Rotation Angle in radians.
    Returns
    -------
    tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the TIRS to
        CIRS basis rotation.
    """
    return (-era, 0, 0)  # See USNO Circular 179

tirs_to_pointing(latitude, longitude, altitude=np.pi / 2, azimuth=np.pi, boresight=0.0)

Creates a rotation object for basis rotations from the TIRS frame to a frame aligned with the telescope pointing direction.

Parameters:

Name Type Description Default
latitude float

The latitude in radians.

required
longitude float

The longitude in radians.

required
altitude float

The altitude angle in radians, by default np.pi / 2.

pi / 2
azimuth float

The azimuth angle in radians, by default np.pi.

pi
boresight float

The boresight angle in radians, by default 0.0.

0.0

Returns:

Type Description
Rotation

A Rotation object representing the TIRS to telescope pointing frame basis rotation.

Source code in src/serval/rotate.py
def tirs_to_pointing(
    latitude: float,
    longitude: float,
    altitude: float = np.pi / 2,
    azimuth: float = np.pi,
    boresight: float = 0.0,
) -> Rotation:
    r"""Creates a rotation object for basis rotations from the TIRS frame to a frame aligned with
    the telescope pointing direction.
    Parameters
    ----------
    latitude : float
        The latitude in radians.
    longitude : float
        The longitude in radians.
    altitude : float, optional
        The altitude angle in radians, by default np.pi / 2.
    azimuth : float, optional
        The azimuth angle in radians, by default np.pi.
    boresight : float, optional
        The boresight angle in radians, by default 0.0.
    Returns
    -------
    Rotation
        A Rotation object representing the TIRS to telescope pointing frame basis rotation.
    """
    return zenith_to_pointing(
        altitude=altitude, azimuth=azimuth, boresight=boresight
    ) * tirs_to_zenith(latitude=latitude, longitude=longitude)

tirs_to_pointing_eulers(latitude, longitude, altitude=np.pi / 2, azimuth=np.pi, boresight=0.0)

Returns the Euler angles for basis rotations from the TIRS frame to a frame aligned with the telescope pointing direction.

The convention used here and throughout is that of ZYZ passive (basis) rotations.

Parameters:

Name Type Description Default
latitude float

The latitude in radians.

required
longitude float

The longitude in radians.

required
altitude float

The altitude angle in radians, by default np.pi / 2.

pi / 2
azimuth float

The azimuth angle in radians, by default np.pi.

pi
boresight float

The boresight angle in radians, by default 0.0.

0.0

Returns:

Type Description
tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the TIRS to telescope pointing basis rotation.

Source code in src/serval/rotate.py
def tirs_to_pointing_eulers(
    latitude: float,
    longitude: float,
    altitude: float = np.pi / 2,
    azimuth: float = np.pi,
    boresight: float = 0.0,
) -> EulersType:
    r"""Returns the Euler angles for basis rotations from the TIRS frame to a frame aligned
    with the telescope pointing direction.

    The convention used here and throughout is that of
    ZYZ passive (basis) rotations.

    Parameters
    ----------
    latitude : float
        The latitude in radians.
    longitude : float
        The longitude in radians.
    altitude : float, optional
        The altitude angle in radians, by default np.pi / 2.
    azimuth : float, optional
        The azimuth angle in radians, by default np.pi.
    boresight : float, optional
        The boresight angle in radians, by default 0.0.
    Returns
    -------
    tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the TIRS to telescope pointing
        basis rotation.
    """
    return eulers_from_rotation(
        tirs_to_pointing(
            latitude=latitude,
            longitude=longitude,
            altitude=altitude,
            azimuth=azimuth,
            boresight=boresight,
        )
    )

tirs_to_zenith(latitude, longitude)

Creates a rotation object for basis rotations from the TIRS frame to the local Zenith frame for an observer at a given latitude and longitude.

Parameters:

Name Type Description Default
latitude float

The latitude in radians.

required
longitude float

The longitude in radians.

required

Returns:

Type Description
Rotation

A Rotation object representing the TIRS to local Zenith basis rotation.

Source code in src/serval/rotate.py
def tirs_to_zenith(latitude: float, longitude: float) -> Rotation:
    r"""Creates a rotation object for basis rotations from the TIRS frame to the local Zenith frame
    for an observer at a given latitude and longitude.

    Parameters
    ----------
    latitude : float
        The latitude in radians.
    longitude : float
        The longitude in radians.
    Returns
    -------
    Rotation
        A Rotation object representing the TIRS to local Zenith basis rotation.
    """
    return rotation_from_eulers(
        tirs_to_zenith_eulers(latitude=latitude, longitude=longitude),
    )

tirs_to_zenith_eulers(latitude, longitude)

Returns the Euler angles for basis rotations from the TIRS frame to the local Zenith frame for an observer at a given latitude and longitude.

The convention used here and throughout is that of ZYZ passive (basis) rotations.

Parameters:

Name Type Description Default
latitude float

The latitude in radians.

required
longitude float

The longitude in radians.

required

Returns:

Type Description
tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the TIRS to local Zenith basis rotation.

Source code in src/serval/rotate.py
def tirs_to_zenith_eulers(latitude: float, longitude: float) -> EulersType:
    r"""Returns the Euler angles for basis rotations from the TIRS frame to the local Zenith frame
    for an observer at a given latitude and longitude.

    The convention used here and throughout is that of
    ZYZ passive (basis) rotations.

    Parameters
    ----------
    latitude : float
        The latitude in radians.
    longitude : float
        The longitude in radians.
    Returns
    -------
    tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the TIRS to local Zenith
        basis rotation.
    """
    return (longitude, np.pi / 2 - latitude, 0.0)

wigner_d(lmax, mprime_max, eulers_or_rotation)

Generates Wigner-D matrices up to a given lmax for specified Euler angles or a Rotation object.

Parameters:

Name Type Description Default
lmax int

The maximum degree of the Wigner-D matrices.

required
mprime_max int

The maximum order of m' for the Wigner-D matrices.

required
eulers_or_rotation tuple[float, float, float] | Rotation

The Euler angles (alpha, beta, gamma) specifying the rotation, or a Rotation object.

required

Returns:

Type Description
NDArray[complex128]

An array of shape (lmax+1, 2mprime_max+1, 2lmax+1) containing the Wigner-D matrices.

Source code in src/serval/rotate.py
def wigner_d(
    lmax: int,
    mprime_max: int,
    eulers_or_rotation: EulersType | Rotation,
) -> npt.NDArray[np.complex128]:
    r"""Generates Wigner-D matrices up to a given lmax for specified Euler angles or
    a Rotation object.
    Parameters
    ----------
    lmax : int
        The maximum degree of the Wigner-D matrices.
    mprime_max : int
        The maximum order of m' for the Wigner-D matrices.
    eulers_or_rotation : tuple[float, float, float] | Rotation
        The Euler angles (alpha, beta, gamma) specifying the rotation,
        or a Rotation object.
    Returns
    -------
    npt.NDArray[np.complex128]
        An array of shape (lmax+1, 2*mprime_max+1, 2*lmax+1) containing the Wigner-D matrices.
    """
    if isinstance(eulers_or_rotation, Rotation):
        eulers = eulers_from_rotation(eulers_or_rotation)
    else:
        eulers = eulers_or_rotation

    # One-hot-dot reconstruction of wigner matrix through per-mprime transforming
    # To rotate, you contract on the first axes of the wigner-d matrix
    out_wigner = np.zeros(
        shape=(lmax + 1, 2, mprime_max + 1, 2, lmax + 1), dtype=np.complex128
    )
    one_hot = pysh.SHCoeffs.from_zeros(lmax=lmax, kind="complex", **SHT_CONVENTIONS)
    for m in range(0, mprime_max + 1):
        one_hot.coeffs[:] = 0.0
        one_hot.coeffs[0, m:, m] = 1.0
        rotted = one_hot.rotate(*eulers, **ROTATE_CONVENTIONS)
        out_wigner[:, 0, m, ...] = np.swapaxes(rotted.coeffs[:, :, :], 0, 1)
    m1 = np.stack(
        [np.arange(0, out_wigner.shape[2]), -np.arange(0, out_wigner.shape[2])], axis=0
    )[None, :, :, None, None]
    m2 = np.stack(
        [np.arange(0, out_wigner.shape[-1]), -np.arange(0, out_wigner.shape[-1])],
        axis=0,
    )[None, None, None, :, :]
    sym_factor = (-1.0) ** (m1 + m2)
    out_wigner[:, 1, 1:, 0, 0] = (out_wigner * sym_factor)[:, 0, 1:, 0, 0].conj()
    out_wigner[:, 1, 1:, 0, 1:] = (out_wigner * sym_factor)[:, 0, 1:, 1, 1:].conj()
    out_wigner[:, 1, 1:, 1, 1:] = (out_wigner * sym_factor)[:, 0, 1:, 0, 1:].conj()

    # Do stacking of ms
    neg_mprimes = out_wigner[:, 1, :0:-1, ...]
    pos_mprimes = out_wigner[:, 0, ...]
    out_wigner = np.concatenate([neg_mprimes, pos_mprimes], axis=1)

    neg_ms = out_wigner[..., 1, :0:-1]
    pos_ms = out_wigner[..., 0, :]

    return np.concatenate([neg_ms, pos_ms], axis=-1)

zenith_to_baseline_direc(east, north, up)

Creates a rotation object for basis rotations from the local Zenith frame to a frame aligned with the direction of the baseline vector.

Parameters:

Name Type Description Default
east float

The East component of the baseline vector.

required
north float

The North component of the baseline vector.

required
up float

The Up component of the baseline vector.

required

Returns:

Type Description
Rotation

A Rotation object representing the local Zenith to baseline direction frame basis rotation.

Source code in src/serval/rotate.py
def zenith_to_baseline_direc(east: float, north: float, up: float) -> Rotation:
    r"""Creates a rotation object for basis rotations from the local Zenith frame
    to a frame aligned with the direction of the baseline vector.

    Parameters
    ----------
    east : float
        The East component of the baseline vector.
    north : float
        The North component of the baseline vector.
    up : float
        The Up component of the baseline vector.
    Returns
    -------
    Rotation
        A Rotation object representing the local Zenith to baseline direction
        frame basis rotation.
    """
    return rotation_from_eulers(
        zenith_to_baseline_direc_eulers(east=east, north=north, up=up),
    )

zenith_to_baseline_direc_eulers(east, north, up)

Returns the Euler angles for basis rotations from the local Zenith frame to a frame aligned with the direction of the baseline vector.

The convention used here and throughout is that of ZYZ passive (basis) rotations.

Parameters:

Name Type Description Default
east float

The East component of the baseline vector.

required
north float

The North component of the baseline vector.

required
up float

The Up component of the baseline vector.

required

Returns:

Type Description
tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the local Zenith to baseline direction frame basis rotation.

Source code in src/serval/rotate.py
def zenith_to_baseline_direc_eulers(east: float, north: float, up: float) -> EulersType:
    r"""Returns the Euler angles for basis rotations from the local Zenith frame
    to a frame aligned with the direction of the baseline vector.

    The convention used here and throughout is that of
    ZYZ passive (basis) rotations.

    Parameters
    ----------
    east : float
        The East component of the baseline vector.
    north : float
        The North component of the baseline vector.
    up : float
        The Up component of the baseline vector.
    Returns
    -------
    tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the local Zenith to baseline direction
        frame basis rotation.
    """
    norm = np.linalg.norm([east, north, up])
    if norm == 0.0:
        return (0.0, 0.0, 0.0)
    else:
        return (
            -np.arctan2(east / norm, north / norm),
            np.arcsin(up / norm) - np.pi / 2,
            0.0,
        )

zenith_to_pointing(altitude=np.pi / 2, azimuth=np.pi, boresight=0.0)

Creates a rotation object from the local Zenith frame to the to a frame aligned with the telescope pointing direction.

Parameters:

Name Type Description Default
altitude float

The altitude angle in radians , by default np.pi / 2.

pi / 2
azimuth float

The azimuth angle in radians, by default np.pi.

pi
boresight float

The boresight angle in radians, by default 0.

0.0

Returns:

Type Description
Rotation

A Rotation object representing the local Zenith to telescope pointing frame basis rotation.

Source code in src/serval/rotate.py
def zenith_to_pointing(
    altitude: float = np.pi / 2,
    azimuth: float = np.pi,
    boresight: float = 0.0,
) -> Rotation:
    r"""Creates a rotation object from the local Zenith frame to the to a frame aligned with
    the telescope pointing direction.
    Parameters
    ----------
    altitude : float, optional
        The altitude angle in radians , by default np.pi / 2.
    azimuth : float, optional
        The azimuth angle in radians, by default np.pi.
    boresight : float, optional
        The boresight angle in radians, by default 0.
    Returns
    -------
    Rotation
        A Rotation object representing the local Zenith to telescope pointing frame basis rotation.
    """
    return rotation_from_eulers(
        zenith_to_pointing_eulers(
            altitude=altitude, azimuth=azimuth, boresight=boresight
        ),
    )

zenith_to_pointing_eulers(altitude=None, azimuth=None, boresight=None)

Returns the Euler angles for basis rotations from the local Zenith frame to a frame aligned with the telescope pointing direction.

The convention used here and throughout is that of ZYZ passive (basis) rotations.

Parameters:

Name Type Description Default
altitude float

The altitude angle in radians, by default np.pi / 2.

None
azimuth float

The azimuth angle in radians, by default np.pi.

None
boresight float

The boresight angle in radians, by default 0.0.

None

Returns:

Type Description
tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the local Zenith to pointing frame basis rotation.

Source code in src/serval/rotate.py
def zenith_to_pointing_eulers(
    altitude: float | None = None,
    azimuth: float | None = None,
    boresight: float | None = None,
) -> EulersType:
    r"""Returns the Euler angles for basis rotations from the local Zenith frame
    to a frame aligned with the telescope pointing direction.

    The convention used here and throughout is that of
    ZYZ passive (basis) rotations.

    Parameters
    ----------
    altitude : float, optional
        The altitude angle in radians, by default np.pi / 2.
    azimuth : float, optional
        The azimuth angle in radians, by default np.pi.
    boresight : float, optional
        The boresight angle in radians, by default 0.0.
    Returns
    -------
    tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the local Zenith to pointing
        frame basis rotation.
    """
    altitude = np.pi / 2 if altitude is None else altitude  # Default
    azimuth = np.pi if azimuth is None else azimuth  # Default
    boresight = 0.0 if boresight is None else boresight  # Default
    return (-azimuth, altitude - np.pi / 2, np.pi + boresight)

Spherical Harmonic Transform Utilities (serval.sht.py)

alm_from_healpix(healpix_alm, return_sparse=True)

Transforms spherical harmonics coefficients from healpy format (1D) to standard Fourier order format (2D). It can return either a sparse or dense array.

Parameters:

Name Type Description Default
healpix_alm NDArray[complex128]

A spherical harmonics coefficients array in healpy format (1D).

required
return_sparse bool

Whether to return a sparse array or a dense one.

True

Returns:

Type Description
COO | NDArray[complex128]

A spherical harmonics coefficients array in standard Fourier order (2D), in sparse or dense form.

Source code in src/serval/sht.py
def alm_from_healpix(
    healpix_alm: npt.NDArray[np.complex128], return_sparse: bool = True
) -> sparse.COO | npt.NDArray[np.complex128]:
    r"""Transforms spherical harmonics coefficients from healpy format (1D)
    to standard Fourier order format (2D).
    It can return either a sparse or dense array.
    Parameters
    ----------
    healpix_alm : npt.NDArray[np.complex128]
        A spherical harmonics coefficients array in healpy format (1D).
    return_sparse : bool, optional
        Whether to return a sparse array or a dense one.
    Returns
    -------
    sparse.COO | npt.NDArray[np.complex128]
        A spherical harmonics coefficients array in standard Fourier order (2D),
        in sparse or dense form.
    """
    import healpy as hp  # Make optional dependency?

    lmax = hp.Alm.getlmax(healpix_alm.size)
    ell, m = hp.Alm.getlm(lmax)
    sparse_alm = sparse.COO(
        [ell, m + lmax], healpix_alm, shape=(lmax + 1, 2 * lmax + 1)
    )
    # Put in redundant part for real map.
    sparse_alm += sparse.COO(
        [ell[m > 0], lmax - m[m > 0]],
        (-1) ** (m[m > 0]) * healpix_alm[m > 0].conj(),
        shape=(lmax + 1, 2 * lmax + 1),
    )
    if return_sparse:
        return sparse_alm
    else:
        return sparse_alm.todense()

analysis(grid, threshold=SPARSE_THRESHOLD, conjugate=False, return_sparse=True)

Transforms a pyshtools grid to the corresponding spherical harmonics coefficients or their conjugates in standard Fourier order. It can return either a sparse or dense array.

Parameters:

Name Type Description Default
grid SHGrid

A spherical harmonic grid object.

required
threshold float

The threshold below which coefficients are set to zero.

SPARSE_THRESHOLD
conjugate bool

Whether to return the conjugate spherical harmonics coefficients.

False
return_sparse bool

Whether to return a sparse array or a dense one.

True

Returns:

Type Description
COO | NDArray[complex128]

The spherical harmonics coefficients array or their conjugates in standard Fourier order, in sparse or dense form.

Source code in src/serval/sht.py
def analysis(
    grid: pysh.SHGrid,
    threshold: float = SPARSE_THRESHOLD,
    conjugate: bool = False,
    return_sparse: bool = True,
) -> sparse.COO | npt.NDArray[np.complex128]:
    r"""Transforms a pyshtools grid to the corresponding spherical harmonics coefficients or their
    conjugates in standard Fourier order. It can return either a sparse or dense array.
    Parameters
    ----------
    grid : pysh.SHGrid
        A spherical harmonic grid object.
    threshold : float, optional
        The threshold below which coefficients are set to zero.
    conjugate : bool, optional
        Whether to return the conjugate spherical harmonics coefficients.
    return_sparse : bool, optional
        Whether to return a sparse array or a dense one.
    Returns
    -------
    sparse.COO | npt.NDArray[np.complex128]
        The spherical harmonics coefficients array or their conjugates in standard Fourier order,
        in sparse or dense form.
    """
    sparse_alm = to_sparse(grid.expand(**SHT_CONVENTIONS), threshold=threshold)
    if conjugate:
        out = make_conjugate(sparse_alm)
    else:
        out = sparse_alm
    if return_sparse:
        return out
    else:
        assert isinstance(out, sparse.COO)
        return out.todense()

array_synthesis(array_coeff)

Transfomrs spherical harmonics coefficients from a pyshtools array to a pyshtools grid.

Parameters:

Name Type Description Default
array_coeff NDArray[complex128]

A spherical harmonics coefficients array in pytshtools format. Its shape should be (lmax + 1, 2 * mmax + 1).

required

Returns:

Type Description
SHGrid

A spherical harmonic grid object.

Source code in src/serval/sht.py
def array_synthesis(array_coeff: npt.NDArray[np.complex128]) -> pysh.SHGrid:
    r"""Transfomrs spherical harmonics coefficients from a pyshtools array to a pyshtools grid.
    Parameters
    ----------
    array_coeff : npt.NDArray[np.complex128]
        A spherical harmonics coefficients array in pytshtools format. Its shape
        should be (lmax + 1, 2 * mmax + 1).
    Returns
    -------
    pysh.SHGrid
        A spherical harmonic grid object.
    """
    return pysh.SHCoeffs.from_array(unstack_ms(array_coeff), **SHT_CONVENTIONS).expand(
        grid=GRID_CONVENTIONS["grid"], extend=GRID_CONVENTIONS["extend"]
    )

broadcast_bandlimits(*coeffs, lmax=None)

Broadcasts a set of spherical harmonics coefficients to common bandlimits. These are either given or inferred from the maximum lmax among the inputs.

Parameters:

Name Type Description Default
*coeffs Coeffs

A variable set of sparse or dense spherical harmonics coefficients arrays in standard Fourier order. The shape or array i in the set should be (lmax_i + 1, 2 * mmax_i + 1) for each i.

()
lmax int | None

The target maximum spherical harmonic degree. If None, it is inferred from the maximum lmax among the input coefficients.

None

Returns:

Type Description
tuple[Coeffs, ...]

A tuple containing the input spherical harmonics coefficients arrays reshaped to the common target bandlimits.

Source code in src/serval/sht.py
def broadcast_bandlimits(
    *coeffs: Coeffs, lmax: int | None = None
) -> tuple[Coeffs, ...]:
    r"""Broadcasts a set of spherical harmonics coefficients to common bandlimits. These are either
    given or inferred from the maximum lmax among the inputs.
    Parameters
    ----------
    *coeffs : Coeffs
        A variable set of sparse or dense spherical harmonics coefficients arrays
        in standard Fourier order. The shape or array i in the set should be
        (lmax_i + 1, 2 * mmax_i + 1) for each i.
    lmax : int | None, optional
        The target maximum spherical harmonic degree. If None, it is inferred from the maximum
        lmax among the input coefficients.
    Returns
    -------
    tuple[Coeffs, ...]
        A tuple containing the input spherical harmonics coefficients arrays reshaped to the
        common target bandlimits.
    """
    if lmax is None:
        target_lmax = max([coeff.shape[0] - 1 for coeff in coeffs])
    else:
        target_lmax = lmax
    return tuple(set_bandlimits(coeff, lmax=target_lmax) for coeff in coeffs)

compute_mmodes(coeffs1, coeffs2)

Computes the mmodes between two spherical harmonics coefficients sparse or dense arrays. The relative formula is given by:

\[ \mathrm{mmodes}[m] = \sum_{\ell=0}^{l_{\max}} \mathrm{coeffs1}[\ell, m]\, \mathrm{coeffs2}[\ell, m] \]

Parameters:

Name Type Description Default
coeffs1 Coeffs

A sparse or dense spherical harmonics coefficients array in standard Fourier order. Its shape should be (lmax1 + 1, 2 * mmax1 + 1).

required
coeffs2 Coeffs

A sparse or dense spherical harmonics coefficients array in standard Fourier order. Its shape should be (lmax2 + 1, 2 * mmax2 + 1).

required

Returns:

Type Description
Coeffs

A sparse or dense spherical harmonics mmodes array resulting from the inner product of the input coefficients. Its shape will be (2 * max(mmax1, mmax2) + 1,).

Source code in src/serval/sht.py
def compute_mmodes(
    coeffs1: Coeffs,
    coeffs2: Coeffs,
) -> Coeffs:
    r"""Computes the mmodes between two spherical harmonics coefficients sparse or dense arrays.
        The relative formula is given by:

    $$
    \mathrm{mmodes}[m] = \sum_{\ell=0}^{l_{\max}}
        \mathrm{coeffs1}[\ell, m]\,
        \mathrm{coeffs2}[\ell, m]
    $$

    Parameters
    ----------
    coeffs1 : Coeffs
        A sparse or dense spherical harmonics coefficients array in standard Fourier order.
        Its shape should be (lmax1 + 1, 2 * mmax1 + 1).
    coeffs2 : Coeffs
        A sparse or dense spherical harmonics coefficients array in standard Fourier order.
        Its shape should be (lmax2 + 1, 2 * mmax2 + 1).
    Returns
    -------
    Coeffs
        A sparse or dense spherical harmonics mmodes array resulting from the inner product
        of the input coefficients. Its shape will be (2 * max(mmax1, mmax2) + 1,).
    """
    xp = array_namespace(coeffs1, coeffs2)
    expanded_coeffs1, expanded_coeffs2 = broadcast_bandlimits(coeffs1, coeffs2)
    mmodes = xp.sum(expanded_coeffs1 * expanded_coeffs2, axis=0)
    return mmodes

from_sparse(sparse_coeffs, lmax=None)

Transfomrs spherical harmonics coefficients from a sparse array in standard Fourier order to a pyshtools SHCoeffs object of given bandlimits.

Parameters:

Name Type Description Default
sparse_coeffs COO

A sparse spherical harmonics coefficients array in standard Fourier order. Its shape should be (lmax_sparse + 1, 2 * mmax_sparse + 1).

required
lmax int | None

The target maximum spherical harmonic degree. If None, it is inferred from the input sparse coefficients lmax_sparse.

None

Returns:

Type Description
SHCoeffs

A spherical harmonics coefficients object in pyshtools format. Its shape will be (2, lmax + 1, mmax + 1).

Source code in src/serval/sht.py
def from_sparse(sparse_coeffs: sparse.COO, lmax: int | None = None) -> pysh.SHCoeffs:
    r"""Transfomrs spherical harmonics coefficients from a sparse array in standard Fourier order
    to a pyshtools SHCoeffs object of given bandlimits.
    Parameters
    ----------
    sparse_coeffs : sparse.COO
        A sparse spherical harmonics coefficients array in standard Fourier order.
        Its shape should be (lmax_sparse + 1, 2 * mmax_sparse + 1).
    lmax : int | None, optional
        The target maximum spherical harmonic degree. If None, it is inferred from the input
        sparse coefficients lmax_sparse.
    Returns
    -------
    pysh.SHCoeffs
        A spherical harmonics coefficients object in pyshtools format. Its shape
        will be (2, lmax + 1, mmax + 1).
    """
    if lmax is not None:
        coeff = set_bandlimits(sparse_coeffs, lmax=lmax)
    else:
        lmax = sparse_coeffs.shape[0] - 1
        coeff = set_bandlimits(sparse_coeffs, lmax=lmax)
    return pysh.SHCoeffs.from_array(unstack_ms(coeff.todense()), **SHT_CONVENTIONS)

grid_template(lmax)

Creates a spherical harmonic grid object of degree lmax filled with complex zeros.

Parameters:

Name Type Description Default
lmax int

The maximum spherical harmonic degree of the grid.

required

Returns:

Type Description
SHGrid

A spherical harmonic grid object with data set to (complex) zeros.

Source code in src/serval/sht.py
def grid_template(lmax: int) -> pysh.SHGrid:
    r"""Creates a spherical harmonic grid object of degree lmax filled with complex zeros.
    Parameters
    ----------
    lmax : int
        The maximum spherical harmonic degree of the grid.
    Returns
    -------
    pysh.SHGrid
        A spherical harmonic grid object with data set to (complex) zeros.
    """
    return pysh.SHGrid.from_zeros(lmax=lmax, kind="complex", **GRID_CONVENTIONS)

make_conjugate(sparse_coeff, return_sparse=True)

Transforms spherical harmonics coefficients to their conjugate form. It can return either a sparse or dense array.

Parameters:

Name Type Description Default
sparse_coeff COO

A sparse spherical harmonics coefficients array in standard Fourier order. Its shape should be (lmax + 1, 2 * mmax + 1).

required
return_sparse bool

Whether to return a sparse array or a dense one.

True

Returns:

Type Description
COO | NDArray[complex128]

The conjugate spherical harmonics coefficients array in standard Fourier order, in sparse or dense form.

Source code in src/serval/sht.py
def make_conjugate(
    sparse_coeff: sparse.COO, return_sparse: bool = True
) -> sparse.COO | npt.NDArray[np.complex128]:
    r"""Transforms spherical harmonics coefficients to their conjugate form. It can return
    either a sparse or dense array.
    Parameters
    ----------
    sparse_coeff : sparse.COO
        A sparse spherical harmonics coefficients array in standard Fourier order.
        Its shape should be (lmax + 1, 2 * mmax + 1).
    return_sparse : bool, optional
        Whether to return a sparse array or a dense one.
    Returns
    -------
    sparse.COO | npt.NDArray[np.complex128]
        The conjugate spherical harmonics coefficients array in standard Fourier order,
        in sparse or dense form.
    """
    lmax = sparse_coeff.shape[0] - 1
    ms = np.arange(sparse_coeff.shape[1]) - lmax
    # This densifies always, could warn but should only be used in testing anyway
    out = (-1) ** np.abs(ms) * sparse_coeff.todense()[:, ::-1]
    if return_sparse:
        return sparse.COO.from_numpy(out)
    else:
        return out

ms_from_array(arr)

Creates a range of integers corresponding to the m values of spherical harmonics from -mmax to +mmax according to the order lmax of the input array.

Parameters:

Name Type Description Default
arr NDArray[complex128]

Any array of shape compatible with spherical harmonics coefficients where the first dimension corresponds to lmax + 1.

required

Returns:

Type Description
NDArray[int_]

An array of integers representing the m values from -mmax to +mmax.

Source code in src/serval/sht.py
def ms_from_array(arr: npt.NDArray[np.complex128]) -> npt.NDArray[np.int_]:
    r"""Creates a range of integers corresponding to the m values of spherical harmonics
    from -mmax to +mmax according to the order lmax of the input array.
    Parameters
    ----------
    arr : npt.NDArray[np.complex128]
        Any array of shape compatible with spherical harmonics coefficients where
        the first dimension corresponds to lmax + 1.
    Returns
    -------
    npt.NDArray[np.int_]
        An array of integers representing the m values from -mmax to +mmax.
    """
    # Gets ms from an mmode array (or anything that has the right shape for one)
    # Assumes no bandlimit in m
    return np.arange(arr.shape[0]) - (arr.shape[0] - 1) // 2

nonzero_bandlimits(coeff)

Computes the bandlimits lmax, mmax of spherical harmonics coefficients from a sparse array based on the non-zero entries.

Parameters:

Name Type Description Default
coeff COO | NDArray[complex128]

A spherical harmonics coefficients sparse array in standard Fourier order. Its shape should be (lmax + 1, 2 * mmax + 1).

required

Returns:

Type Description
tuple[int, int]

A tuple containing the maximum spherical harmonic degree lmax and maximum order mmax based on the non-zero coefficients.

Source code in src/serval/sht.py
def nonzero_bandlimits(
    coeff: sparse.COO | npt.NDArray[np.complex128],
) -> tuple[int, int]:
    r"""Computes the bandlimits lmax, mmax of spherical harmonics coefficients from a sparse
    array based on the non-zero entries.
    Parameters
    ----------
    coeff : sparse.COO | npt.NDArray[np.complex128]
        A spherical harmonics coefficients sparse array in standard Fourier order. Its shape
        should be (lmax + 1, 2 * mmax + 1).
    Returns
    -------
    tuple[int, int]
        A tuple containing the maximum spherical harmonic degree lmax and
        maximum order mmax based on the non-zero coefficients.
    """
    nonzero = coeff.nonzero()
    ells = nonzero[0]
    ms = nonzero[1] - (coeff.shape[1] - 1) // 2
    return int(ells.max()), int(np.abs(ms).max())

set_bandlimits(coeffs, lmax, mmax=None)

Reshapes sparse or dense spherical harmonics coefficients to given bandlimits.

Parameters:

Name Type Description Default
coeffs Coeffs

A sparse or dense spherical harmonics coefficients array in standard Fourier order. Its shape should be (lmax + 1, 2 * mmax + 1).

required
lmax int

The target maximum spherical harmonic degree.

required
mmax int | None

The target maximum spherical harmonic order. If None, it is set equal to lmax.

None

Returns:

Type Description
Coeffs

A sparse or dense spherical harmonics coefficients array reshaped to the target bandlimits.

Source code in src/serval/sht.py
def set_bandlimits(coeffs: Coeffs, lmax: int, mmax: int | None = None) -> Coeffs:
    r"""Reshapes sparse or dense spherical harmonics coefficients to given bandlimits.
    Parameters
    ----------
    coeffs : Coeffs
        A sparse or dense spherical harmonics coefficients array in standard Fourier order.
        Its shape should be (lmax + 1, 2 * mmax + 1).
    lmax : int
        The target maximum spherical harmonic degree.
    mmax : int | None, optional
        The target maximum spherical harmonic order. If None, it is set equal to lmax.
    Returns
    -------
    Coeffs
        A sparse or dense spherical harmonics coefficients array reshaped to the target bandlimits.
    """
    xp = array_namespace(coeffs)
    if mmax is None:
        mmax = lmax
    out = coeffs.copy()
    start_lmax = coeffs.shape[0] - 1
    delta_lmax = start_lmax - lmax
    if delta_lmax > 0:
        out = out[:-delta_lmax, :]
    elif delta_lmax < 0:
        out = xp.pad(out, ((0, abs(delta_lmax)), (0, 0)), constant_values=0.0)
    start_mmax = (coeffs.shape[1] - 1) // 2
    delta_mmax = start_mmax - mmax
    if delta_mmax > 0:
        out = out[:, delta_mmax:-delta_mmax]
    elif delta_mmax < 0:
        out = xp.pad(
            out, ((0, 0), (abs(delta_mmax), abs(delta_mmax))), constant_values=0.0
        )
    return out

sparse_synthesis(sparse_coeff)

Transfomrs spherical harmonics coefficients from a sparse array to a pyshtools grid.

Parameters:

Name Type Description Default
sparse_coeff COO

A sparse spherical harmonics coefficients array in standard Fourier order. Its shape should be (lmax + 1, 2 * mmax + 1).

required

Returns:

Type Description
SHGrid

A spherical harmonic grid object.

Source code in src/serval/sht.py
def sparse_synthesis(sparse_coeff: sparse.COO) -> pysh.SHGrid:
    r"""Transfomrs spherical harmonics coefficients from a sparse array to a pyshtools grid.
    Parameters
    ----------
    sparse_coeff : sparse.COO
        A sparse spherical harmonics coefficients array in standard Fourier order.
        Its shape should be (lmax + 1, 2 * mmax + 1).
    Returns
    -------
    pysh.SHGrid
        A spherical harmonic grid object.
    """
    return from_sparse(sparse_coeff).expand(
        grid=GRID_CONVENTIONS["grid"], extend=GRID_CONVENTIONS["extend"]
    )

stack_ms(coeffs)

Reorganizes spherical harmonics coefficients from the pyshtools format to the standard Fourrier order.

Parameters:

Name Type Description Default
coeffs NDArray[complex128]

A spherical harmonics coefficients array in pyshtools format. Its shape should be (2, lmax + 1, mmax + 1), where the first dimension corresponds to negative and positive m values.

required

Returns:

Type Description
NDArray[complex128]

A spherical harmonics coefficients array in standard Fourier order. Its shape will be (lmax + 1, 2 * mmax + 1), with m values ranging from -mmax to +mmax.

Source code in src/serval/sht.py
def stack_ms(coeffs: npt.NDArray[np.complex128]) -> npt.NDArray[np.complex128]:
    r"""Reorganizes spherical harmonics coefficients from the pyshtools format
    to the standard Fourrier order.
    Parameters
    ----------
    coeffs : npt.NDArray[np.complex128]
        A spherical harmonics coefficients array in pyshtools format. Its shape
        should be (2, lmax + 1, mmax + 1), where the first dimension
        corresponds to negative and positive m values.
    Returns
    -------
    npt.NDArray[np.complex128]
        A spherical harmonics coefficients array in standard Fourier order. Its shape
        will be (lmax + 1, 2 * mmax + 1), with m values ranging from -mmax to +mmax.
    """
    neg_ms = coeffs[1, :, :0:-1]
    pos_ms = coeffs[0, ...]
    return np.concatenate([neg_ms, pos_ms], axis=-1)

threshold_bandlimits(coeff, threshold=SPARSE_THRESHOLD)

Computes the bandlimits lmax, mmax of spherical harmonics coefficients from a sparse or dense array based on the non-zero entries, after setting the coefficients below a given threshold to zero.

Parameters:

Name Type Description Default
coeff COO | NDArray[complex128]

A sparse or dense spherical harmonics coefficients array in standard Fourier order. Its shape should be (lmax + 1, 2 * mmax + 1).

required
threshold float

The threshold below which coefficients are set to zero.

SPARSE_THRESHOLD

Returns:

Type Description
tuple[int, int]

A tuple containing the maximum spherical harmonic degree lmax and maximum order mmax based on the filtered coefficients.

Source code in src/serval/sht.py
def threshold_bandlimits(
    coeff: sparse.COO | npt.NDArray[np.complex128], threshold: float = SPARSE_THRESHOLD
) -> tuple[int, int]:
    r"""Computes the bandlimits lmax, mmax of spherical harmonics coefficients from a sparse or
    dense array based on the non-zero entries, after setting the coefficients below a given
    threshold to zero.
    Parameters
    ----------
    coeff : sparse.COO | npt.NDArray[np.complex128]
        A sparse or dense spherical harmonics coefficients array in standard Fourier order.
        Its shape should be (lmax + 1, 2 * mmax + 1).
    threshold : float, optional
        The threshold below which coefficients are set to zero.
    Returns
    -------
    tuple[int, int]
        A tuple containing the maximum spherical harmonic degree lmax and
        maximum order mmax based on the filtered coefficients.
    """
    xp = array_namespace(coeff)
    return nonzero_bandlimits(xp.where(sparse.abs(coeff) <= threshold, 0.0, coeff))

to_sparse(coeffs, threshold=SPARSE_THRESHOLD)

Transfomrs spherical harmonics coefficients from pyshtools format to a sparse array in standard Fourier order.

Parameters:

Name Type Description Default
coeffs SHCoeffs

A spherical harmonics coefficients object in pyshtools format. Its shape should be (2, lmax + 1, mmax + 1).

required
threshold float

The threshold below which coefficients are set to zero.

SPARSE_THRESHOLD

Returns:

Type Description
COO

A sparse spherical harmonics coefficients array in standard Fourier order. Its shape will be (lmax + 1, 2 * mmax + 1).

Source code in src/serval/sht.py
def to_sparse(coeffs: pysh.SHCoeffs, threshold: float = SPARSE_THRESHOLD) -> sparse.COO:
    r"""Transfomrs spherical harmonics coefficients from pyshtools format to a sparse array
    in standard Fourier order.
    Parameters
    ----------
    coeffs : pysh.SHCoeffs
        A spherical harmonics coefficients object in pyshtools format. Its shape
        should be (2, lmax + 1, mmax + 1).
    threshold : float, optional
        The threshold below which coefficients are set to zero.
    Returns
    -------
    sparse.COO
        A sparse spherical harmonics coefficients array in standard Fourier order.
        Its shape will be (lmax + 1, 2 * mmax + 1).
    """
    out = sparse.COO.from_numpy(stack_ms(coeffs.coeffs))
    out = sparse.where(sparse.abs(out) < threshold, 0.0, out)
    return out

unstack_ms(coeffs)

Reorganizes spherical harmonics coefficients from the standard Fourier order to the pyshtools format.

Parameters:

Name Type Description Default
coeffs NDArray[complex128]

A spherical harmonics coefficients array in standard Fourier order. Its shape should be (lmax + 1, 2 * mmax + 1), with m values ranging from -mmax to +mmax.

required

Returns:

Type Description
NDArray[complex128]

A spherical harmonics coefficients array in pyshtools format. Its shape will be (2, lmax + 1, mmax + 1), where the first dimension corresponds to negative and positive m values.

Source code in src/serval/sht.py
def unstack_ms(coeffs: npt.NDArray[np.complex128]) -> npt.NDArray[np.complex128]:
    r"""Reorganizes spherical harmonics coefficients from the standard Fourier order
    to the pyshtools format.
    Parameters
    ----------
    coeffs : npt.NDArray[np.complex128]
        A spherical harmonics coefficients array in standard Fourier order. Its shape
        should be (lmax + 1, 2 * mmax + 1), with m values ranging from -mmax to +mmax.
    Returns
    -------
    npt.NDArray[np.complex128]
        A spherical harmonics coefficients array in pyshtools format. Its shape
        will be (2, lmax + 1, mmax + 1), where the first dimension
        corresponds to negative and positive m values.
    """
    mmax = (coeffs.shape[1] - 1) // 2
    out = np.zeros(shape=(2, coeffs.shape[0], mmax + 1), dtype=coeffs.dtype)
    out[0, :, :] = coeffs[:, mmax:]
    out[1, :, 1:] = coeffs[:, mmax - 1 :: -1]
    return out

General Utilities (serval.utils.py)

analytic_plane_wave_al(ells, kz)

Computes the analytic spherical harmonic coefficients of a plane wave propagating in the z-direction.

Parameters:

Name Type Description Default
ells NDArray[int64]

The spherical harmonic degrees at which to evaluate the coefficients.

required
kz NDArray[float64]

The wavevector magnitudes at which to evaluate the coefficients.

required

Returns:

Type Description
NDArray[complex128]

The spherical harmonic coefficients of the plane wave at the specified degrees and wavevector magnitudes.

Source code in src/serval/utils.py
def analytic_plane_wave_al(
    ells: npt.NDArray[np.int64], kz: npt.NDArray[np.float64]
) -> npt.NDArray[np.complex128]:
    r"""Computes the analytic spherical harmonic coefficients of a plane wave
    propagating in the z-direction.
    Parameters
    ----------
    ells : npt.NDArray[np.int64]
        The spherical harmonic degrees at which to evaluate the coefficients.
    kz : npt.NDArray[np.float64]
        The wavevector magnitudes at which to evaluate the coefficients.
    Returns
    -------
    npt.NDArray[np.complex128]
        The spherical harmonic coefficients of the plane wave at the specified
        degrees and wavevector magnitudes.
    """
    return spherical_jn(ells, kz) * np.sqrt((2 * ells + 1) * 4 * np.pi) * (1j) ** ells

bandlimited_random_plane_wave(lmax, template_grid, rng, threshold=SPARSE_THRESHOLD)

Generates a random band-limited plane wave on the sphere evaluated on the grid.

Parameters:

Name Type Description Default
lmax int

The maximum spherical harmonic degree for band-limiting.

required
template_grid SHGrid

A pyshtools SHGrid object that serves as a template for the output grid.

required
rng Generator | None

An optional random number generator for reproducibility. If None, a new generator will be created. Default is None.

required
threshold float

The threshold for determining the plane wave magnitude from the bandlimit.

SPARSE_THRESHOLD

Returns:

Type Description
tuple[SHGrid, NDArray[float64]]

A tuple containing the generated band-limited plane wave grid and the corresponding wavevector.

Source code in src/serval/utils.py
def bandlimited_random_plane_wave(
    lmax: int,
    template_grid: pysh.SHGrid,
    rng: np.random.Generator | None,
    threshold: float = SPARSE_THRESHOLD,
) -> tuple[pysh.SHGrid, npt.NDArray[np.float64]]:
    r"""Generates a random band-limited plane wave on the sphere evaluated on the grid.
    Parameters
    ----------
    lmax : int
        The maximum spherical harmonic degree for band-limiting.
    template_grid : pysh.SHGrid
        A pyshtools SHGrid object that serves as a template for the output grid.
    rng : np.random.Generator | None
        An optional random number generator for reproducibility. If None, a new generator
        will be created. Default is None.
    threshold : float, optional
        The threshold for determining the plane wave magnitude from the bandlimit.
    Returns
    -------
    tuple[pysh.SHGrid, npt.NDArray[np.float64]]
        A tuple containing the generated band-limited plane wave grid and the
        corresponding wavevector.
    """
    if rng is None:
        rng = np.random.default_rng()
    unrot_k = np.array([0, 0, plane_wave_mag_from_bandlimit(lmax, threshold=threshold)])
    eulers = rng.random(3) * np.array([2 * np.pi, np.pi, 2 * np.pi])
    rot_mat = rotation_from_eulers(eulers)
    k = rot_mat.apply(unrot_k)
    plane_wave_grid = plane_waves(
        [
            k,
        ],
        template_grid,
    )
    return (
        plane_wave_grid,
        k,
    )

gaussian(tirs_dirvec, fwhm, template)

Generates a Gaussian beam on the sphere, centered on a given direction vector, and returns it as a pysh.SHGrid.

Parameters:

Name Type Description Default
tirs_dirvec NDArray[float64]

The direction vector (in TIRS coordinates) where the Gaussian is centered.

required
fwhm float

The full width at half maximum (FWHM) of the Gaussian beam in radians.

required
template SHGrid

A pyshtools SHGrid object that serves as a template for the output grid.

required

Returns:

Type Description
SHGrid

A pyshtools SHGrid object containing the Gaussian beam data.

Source code in src/serval/utils.py
def gaussian(
    tirs_dirvec: npt.NDArray[np.float64], fwhm: float, template: pysh.SHGrid
) -> pysh.SHGrid:
    r"""Generates a Gaussian beam on the sphere, centered on a given direction vector,
    and returns it as a pysh.SHGrid.
    Parameters
    ----------
    tirs_dirvec : npt.NDArray[np.float64]
        The direction vector (in TIRS coordinates) where the Gaussian is centered.
    fwhm : float
        The full width at half maximum (FWHM) of the Gaussian beam in radians.
    template : pysh.SHGrid
        A pyshtools SHGrid object that serves as a template for the output grid.
    Returns
    -------
    pysh.SHGrid
        A pyshtools SHGrid object containing the Gaussian beam data."""
    theta, phi = np.meshgrid(
        np.pi / 2 - template.lats(degrees=False),
        template.lons(degrees=False),
        indexing="ij",
    )
    nhat = spherical_to_normal(theta, phi)
    out_grid = template.copy()
    sigma = gaussian_fwhm_to_sigma * fwhm
    projections = nhat @ tirs_dirvec
    off = np.any(
        (projections < -1 - FLOAT_THRESHOLD) | (projections > 1 + FLOAT_THRESHOLD)
    )
    if off:
        raise ValueError("The projections of tirs_dirvec must be in the range [-1, 1].")
    projections = np.clip(projections, -1.0, 1.0)
    out_grid.data = np.exp(-((np.arccos(projections)) ** 2) / 2 / sigma**2)
    return out_grid

harmonic_point_source(ra_deg, dec_deg, lmax)

Generates band-limited spherical harmonic coefficients for a point source at given RA and Dec.

Parameters:

Name Type Description Default
ra_deg float

The right ascension of the point source in degrees.

required
dec_deg float

The declination of the point source in degrees.

required
lmax int

The maximum spherical harmonic degree.

required

Returns:

Type Description
COO

The band-limited spherical harmonic coefficients of the point source in sparse format.

Source code in src/serval/utils.py
def harmonic_point_source(ra_deg: float, dec_deg: float, lmax: int) -> sparse.COO:
    r"""Generates band-limited spherical harmonic coefficients for a point source
    at given RA and Dec.
    Parameters
    ----------
    ra_deg : float
        The right ascension of the point source in degrees.
    dec_deg : float
        The declination of the point source in degrees.
    lmax : int
        The maximum spherical harmonic degree.
    Returns
    -------
    sparse.COO
        The band-limited spherical harmonic coefficients of the point source in sparse format.
    """
    template = grid_template(lmax).expand(**SHT_CONVENTIONS)
    template.coeffs[0, :, 0] = 1.0
    template /= integral_from_coeff(template)  # Flux normalize
    ps_theta = np.pi / 2 - np.radians(dec_deg)
    ps_phi = np.radians(ra_deg)
    rot_ps_coeffs = template.rotate(
        *inv_eulers((ps_phi, ps_theta, 0)), **ROTATE_CONVENTIONS
    )
    sparse_ps_alm = to_sparse(rot_ps_coeffs)
    return sparse_ps_alm

integral_from_coeff(coeff)

Computes the integral over the sphere from spherical harmonic coefficients.

Parameters:

Name Type Description Default
coeff SHCoeffs

The spherical harmonic coefficients to integrate in pyshtools format.

required

Returns:

Type Description
float

The integral of the function represented by the spherical harmonic coefficients over the unit sphere.

Source code in src/serval/utils.py
def integral_from_coeff(coeff: pysh.SHCoeffs) -> float:
    r"""Computes the integral over the sphere from spherical harmonic coefficients.
    Parameters
    ----------
    coeff : pysh.SHCoeffs
        The spherical harmonic coefficients to integrate in pyshtools format.
    Returns
    -------
    float
        The integral of the function represented by the spherical harmonic coefficients
        over the unit sphere.
    """
    # +, l=0, |m|=0, assumes SHT_CONVENTIONS
    return coeff.coeffs[0][0, 0] * np.sqrt(4 * np.pi)

integral_from_grid(grid_integrand)

Computes the integral over the sphere from a spherical harmonic grid.

Parameters:

Name Type Description Default
coeff SHGrid

The spherical harmonic grid to integrate in pyshtools format.

required

Returns:

Type Description
float

The integral of the function represented by the spherical harmonic grid over the unit sphere.

Source code in src/serval/utils.py
def integral_from_grid(grid_integrand: pysh.SHGrid) -> float:
    r"""Computes the integral over the sphere from a spherical harmonic grid.
    Parameters
    ----------
    coeff : pysh.SHGrid
        The spherical harmonic grid to integrate in pyshtools format.
    Returns
    -------
    float
        The integral of the function represented by the spherical harmonic grid
        over the unit sphere.
    """
    return integral_from_coeff(grid_integrand.expand(lmax_calc=0, **SHT_CONVENTIONS))

mmodes_to_visibilities(mmodes, m1max=None, ms=None)

Computes visibilities from m-modes using inverse FFT.

Parameters:

Name Type Description Default
mmodes NDArray[complex128]

The input m-modes with shape (..., n_mmodes). They are assumed to be mmodes are assumed to be in unshifted FFT order.

required
m1max int | None

The user defined maximum absolute m-mode index. If None, it is assumed that mmodes are already in full format (from -m1max to +m1max). Default is None.

None
ms NDArray[int64] | None

The specific m-mode indices corresponding to the input mmodes. If None, it is assumed that mmodes are in full format. Default is None.

None

Returns:

Type Description
NDArray[complex128]

The output visibilities with shape (..., n_times).

Source code in src/serval/utils.py
def mmodes_to_visibilities(
    mmodes: npt.NDArray[np.complex128],
    m1max: int | None = None,
    ms: npt.NDArray[np.int64] | None = None,
) -> npt.NDArray[np.complex128]:
    r"""Computes visibilities from m-modes using inverse FFT.
    Parameters
    ----------
    mmodes : npt.NDArray[np.complex128]
        The input m-modes with shape (..., n_mmodes). They are assumed to be mmodes are assumed to
        be in unshifted FFT order.
    m1max : int | None, optional
        The user defined maximum absolute m-mode index. If None, it is assumed that mmodes are
        already in full format (from -m1max to +m1max). Default is None.
    ms : npt.NDArray[np.int64] | None, optional
        The specific m-mode indices corresponding to the input mmodes. If None, it is
        assumed that mmodes are in full format. Default is None.
    Returns
    -------
    npt.NDArray[np.complex128]
        The output visibilities with shape (..., n_times).
    """
    if ms is None:
        full_mmodes = np.fft.fftshift(mmodes, axes=-1)
    else:
        if not isinstance(m1max, int):
            raise TypeError("m1max must be an integer, since ms is not None.")
        if mmodes.ndim == 1:
            full_mmodes = np.zeros(2 * m1max + 1, dtype=np.complex128)
            fftfreq = np.fft.ifftshift(
                np.linspace(-m1max, m1max, 2 * m1max + 1)
            ).astype(int)
            _, comm1, _ = np.intersect1d(fftfreq, ms, return_indices=True)
            full_mmodes[comm1] = mmodes
        else:
            full_mmodes = np.zeros(
                mmodes.shape[:-1] + (2 * m1max + 1,), dtype=np.complex128
            )
            fftfreq = np.fft.ifftshift(
                np.linspace(-m1max, m1max, 2 * m1max + 1)
            ).astype(int)
            _, comm1, _ = np.intersect1d(fftfreq, ms, return_indices=True)
            full_mmodes[..., comm1] = mmodes

    return np.fft.ifft(full_mmodes, axis=-1)

plane_wave_bandlimits(k, threshold=SPARSE_THRESHOLD)

Computes the spherical harmonic bandlimits lmax, mmax required to represent a plane wave with wavevector k to a given threshold.

Parameters:

Name Type Description Default
k NDArray[float64]

The wavevector of the plane wave.

required
threshold float

The threshold for determining the bandlimits. Default is SPARSE_THRESHOLD.

SPARSE_THRESHOLD

Returns:

Type Description
tuple[int, int]

A tuple containing the maximum spherical harmonic degree lmax and maximum absolute order mmax required to represent the plane wave.

Source code in src/serval/utils.py
def plane_wave_bandlimits(
    k: npt.NDArray[np.float64], threshold: float = SPARSE_THRESHOLD
) -> tuple[int, int]:
    r"""Computes the spherical harmonic bandlimits lmax, mmax required to
    represent a plane wave with wavevector k to a given threshold.
    Parameters
    ----------
    k : npt.NDArray[np.float64]
        The wavevector of the plane wave.
    threshold : float, optional
        The threshold for determining the bandlimits. Default is SPARSE_THRESHOLD.
    Returns
    -------
    tuple[int, int]
        A tuple containing the maximum spherical harmonic degree lmax and
        maximum absolute order mmax required to represent the plane wave.
    """
    # Threshold in |al|, not power
    mag_k = np.linalg.norm(k)
    lstart = max(0, int(np.round(mag_k)) - 1)
    lstop = max(13, 10 * lstart)
    plane_wave_al = np.abs(analytic_plane_wave_al(np.arange(lstart, lstop), mag_k))
    plane_wave_al /= plane_wave_al.max()
    lmax = np.argmin(plane_wave_al >= threshold) + lstart
    if mag_k == 0.0:
        mmax = 0  # +1 maybe?
    else:
        mmax = int(np.round(lmax * np.sqrt(k[0] ** 2 + k[1] ** 2) / mag_k))  # +1 maybe?
    return int(lmax), mmax

plane_wave_mag_from_bandlimit(lmax, threshold=SPARSE_THRESHOLD)

Computes the maximum usable plane-wave wavenumber |k| that can be represented on a spherical-harmonic grid of bandlimit lmax.

Parameters:

Name Type Description Default
lmax int

The maximum spherical harmonic degree for band-limiting.

required
threshold float

The threshold for determining the plane wave magnitude from the bandlimit.

SPARSE_THRESHOLD

Returns:

Type Description
float

The maximum usable plane-wave wavenumber |k|.

Source code in src/serval/utils.py
def plane_wave_mag_from_bandlimit(
    lmax: int, threshold: float = SPARSE_THRESHOLD
) -> float:
    r"""Computes the maximum usable plane-wave wavenumber |k| that can be represented
    on a spherical-harmonic grid of bandlimit lmax.
    Parameters
    ----------
    lmax : int
        The maximum spherical harmonic degree for band-limiting.
    threshold : float, optional
        The threshold for determining the plane wave magnitude from the bandlimit.
    Returns
    -------
    float
        The maximum usable plane-wave wavenumber |k|.
    """
    at_bandlimit = np.abs(analytic_plane_wave_al(lmax, lmax))

    def to_opt(mag_k):
        return np.log10(
            np.abs(analytic_plane_wave_al(lmax, mag_k)) / at_bandlimit
        ) - np.log10(threshold)

    with np.errstate(divide="ignore"):
        return brentq(to_opt, 0, lmax)

plane_waves(ks, template)

Produces a band-limited spherical plane wave evaluated on the grid from wavevectors ks.

Parameters:

Name Type Description Default
ks list[NDArray[float64]]

A list of wavevector arrays. Each array should have shape (3,).

required
template SHGrid

A pyshtools SHGrid object that serves as a template for the output grid.

required

Returns:

Type Description
SHGrid

A pyshtools SHGrid object containing the plane wave data.

Source code in src/serval/utils.py
def plane_waves(
    ks: list[npt.NDArray[np.float64]], template: pysh.SHGrid
) -> pysh.SHGrid:
    r"""Produces a band-limited spherical plane wave evaluated on the grid from wavevectors ks.
    Parameters
    ----------
    ks : list[npt.NDArray[np.float64]]
        A list of wavevector arrays. Each array should have shape (3,).
    template : pysh.SHGrid
        A pyshtools SHGrid object that serves as a template for the output grid.
    Returns
    -------
    pysh.SHGrid
        A pyshtools SHGrid object containing the plane wave data."""
    theta, phi = np.meshgrid(
        np.pi / 2 - template.lats(degrees=False),
        template.lons(degrees=False),
        indexing="ij",
    )
    nhat = spherical_to_normal(theta, phi)
    out_grid = template.copy()
    tot_k = np.sum(ks, axis=0)
    if np.linalg.norm(tot_k) > template.lmax:
        raise ValueError(
            f"Harmonic order of grid is too low to sample plane wave at bandlimit. "
            f"|k| = {np.linalg.norm(tot_k):.4f} > {template.lmax:.4f}"
        )
    out_grid.data = np.exp(1j * (nhat @ tot_k))
    return out_grid

plane_waves_integral(ks)

Computes the integral over the unit sphere of plane waves with wavevectors ks.

\[ \int_{S^2} e^{\,i\,\mathbf{k}\cdot\mathbf{n}}\, d\Omega = 4\pi\, \frac{\sin\lvert \mathbf{k}\rvert}{\lvert \mathbf{k}\rvert} \]

Parameters:

Name Type Description Default
ks list[NDArray[float64]]

A list of wavevector arrays. Each array should have shape (..., 3), where ... represents any number of leading dimensions.

required

Returns:

Type Description
float | NDArray[float64]

The integral of the product of plane waves over the unit sphere. The output shape is the broadcasted shape of the input wavevector arrays without the last dimension.

Source code in src/serval/utils.py
def plane_waves_integral(
    ks: list[npt.NDArray[np.float64]],
) -> float | npt.NDArray[np.float64]:
    r"""Computes the integral over the unit sphere of plane waves with wavevectors ks.

    $$
    \int_{S^2} e^{\,i\,\mathbf{k}\cdot\mathbf{n}}\, d\Omega
        = 4\pi\, \frac{\sin\lvert \mathbf{k}\rvert}{\lvert \mathbf{k}\rvert}
    $$

    Parameters
    ----------
    ks : list[npt.NDArray[np.float64]]
        A list of wavevector arrays. Each array should have shape (..., 3), where
        ... represents any number of leading dimensions.
    Returns
    -------
    float | npt.NDArray[np.float64]
        The integral of the product of plane waves over the unit sphere. The output
        shape is the broadcasted shape of the input wavevector arrays without the last
        dimension.
    """
    # Can be broadcastable vector stacks with xyz on -1 axis.
    total_k = np.sum(np.array(np.broadcast_arrays(*ks)), axis=0)
    mag_k = np.linalg.norm(total_k, axis=-1)
    return 4 * np.pi * np.sinc(mag_k / np.pi)

plane_waves_mmodes(ks, mmax)

Computes the m-modes of a product of plane waves by rotating one of the k-vectors (the first one) through all azimuthal angles to create visibilities.

Parameters:

Name Type Description Default
ks list[NDArray[float64]]

A list of wavevector arrays. Each array should have shape (3,).

required
mmax int

The maximum absolute m-mode index.

required

Returns:

Type Description
NDArray[complex128]

The m-modes of the product of plane waves, with shape (2 * mmax + 1,).

Source code in src/serval/utils.py
def plane_waves_mmodes(
    ks: list[npt.NDArray[np.float64]], mmax: int
) -> npt.NDArray[np.complex128]:
    r"""Computes the m-modes of a product of plane waves by rotating one of the k-vectors
    (the first one) through all azimuthal angles to create visibilities.
    Parameters
    ----------
    ks : list[npt.NDArray[np.float64]]
        A list of wavevector arrays. Each array should have shape (3,).
    mmax : int
        The maximum absolute m-mode index.
    Returns
    -------
    npt.NDArray[np.complex128]
        The m-modes of the product of plane waves, with shape (2 * mmax + 1,).
    """
    # Rotates the first plane wave relative to the others.
    eras = np.linspace(0, 2 * np.pi, 2 * mmax + 1, endpoint=False)
    # Rotate k0 from cirs to tirs basis (sky-like)
    rot_k0s = np.array([tirs_to_cirs(era).inv().apply(ks[0]) for era in eras])
    pw_vis = plane_waves_integral(
        [
            rot_k0s,
        ]
        + ks[1:]
    )
    mmodes = visibilities_to_mmodes(pw_vis)
    return mmodes

pointed_gaussian(latitude, longitude, fwhm, template, altitude=np.pi / 2, azimuth=np.pi)

Generates a Gaussian beam on the sphere, with a beam pointing in a given direction from a given latitude and longitude, and returns it as a pysh.SHGrid.

Parameters:

Name Type Description Default
latitude float

The latitude of the telescope in radians.

required
longitude float

The longitude of the telescope in radians.

required
fwhm float

The full width at half maximum (FWHM) of the Gaussian beam in radians.

required
template SHGrid

A pyshtools SHGrid object that serves as a template for the output grid.

required
altitude float

The altitude angle of the pointing direction in radians. Default is pi/2.

pi / 2
azimuth float

The azimuth angle of the pointing direction in radians. Default is pi.

pi

Returns:

Type Description
SHGrid

A pyshtools SHGrid object containing the Gaussian beam data.

Source code in src/serval/utils.py
def pointed_gaussian(
    latitude: float,
    longitude: float,
    fwhm: float,
    template: pysh.SHGrid,
    altitude: float = np.pi / 2,
    azimuth: float = np.pi,
) -> pysh.SHGrid:
    r"""Generates a Gaussian beam on the sphere, with a beam pointing in a given
    direction from a given latitude and longitude, and returns it as a pysh.SHGrid.
    Parameters
    ----------
    latitude : float
        The latitude of the telescope in radians.
    longitude : float
        The longitude of the telescope in radians.
    fwhm : float
        The full width at half maximum (FWHM) of the Gaussian beam in radians.
    template : pysh.SHGrid
        A pyshtools SHGrid object that serves as a template for the output grid.
    altitude : float, optional
        The altitude angle of the pointing direction in radians. Default is pi/2.
    azimuth : float, optional
        The azimuth angle of the pointing direction in radians. Default is pi.
    Returns
    -------
    pysh.SHGrid
        A pyshtools SHGrid object containing the Gaussian beam data.
    """
    pointing_to_tirs = tirs_to_pointing(
        latitude=latitude,
        longitude=longitude,
        altitude=altitude,
        azimuth=azimuth,
        boresight=0.0,
    ).inv()
    tirs_dirvec = pointing_to_tirs.apply(np.array([0, 0, 1]))
    return gaussian(tirs_dirvec=tirs_dirvec, fwhm=fwhm, template=template)

power_law_sky(lmax, power_law_index=2.0, seed=None)

Generates a random sky realization with a power-law angular power spectrum.

Parameters:

Name Type Description Default
lmax int

The maximum spherical harmonic degree.

required
power_law_index float

The index of the power-law angular power spectrum. Default is 2.0.

2.0
seed int | None

The random seed for reproducibility. Default is None.

None

Returns:

Type Description
SHGrid

The generated random sky realization as a pyshtools spherical harmonic grid.

Source code in src/serval/utils.py
def power_law_sky(
    lmax: int, power_law_index: float = 2.0, seed: int | None = None
) -> pysh.SHGrid:
    r"""Generates a random sky realization with a power-law angular power spectrum.
    Parameters
    ----------
    lmax : int
        The maximum spherical harmonic degree.
    power_law_index : float, optional
        The index of the power-law angular power spectrum. Default is 2.0.
    seed : int | None, optional
        The random seed for reproducibility. Default is None.
    Returns
    -------
    pysh.SHGrid
        The generated random sky realization as a pyshtools spherical harmonic grid.
    """
    ells = np.arange(lmax + 1, dtype=float)
    power = np.zeros(lmax + 1, dtype=float)
    power[1:] = ells[1:] ** (-power_law_index)
    pl_sky_alm = stack_ms(
        pysh.SHCoeffs.from_random(
            power, kind="complex", seed=seed, **SHT_CONVENTIONS
        ).coeffs
    )
    pl_sky_grid = array_synthesis(pl_sky_alm)
    pl_sky_grid.data.imag = 0.0  # Make sky real.
    return pl_sky_grid

spherical_to_normal(theta, phi)

Transforms arrays of spherical coordinates to arrays of Cartesian unit normal vectors.

Parameters:

Name Type Description Default
theta NDArray[float64]

The polar angles in radians.

required
phi NDArray[float64]

The azimuthal angles in radians.

required

Returns:

Type Description
NDArray[float64]

The Cartesian unit normal vectors corresponding to the input spherical angles.

Source code in src/serval/utils.py
def spherical_to_normal(
    theta: npt.NDArray[np.float64], phi: npt.NDArray[np.float64]
) -> npt.NDArray[np.float64]:
    r"""Transforms arrays of spherical coordinates to arrays of Cartesian unit normal vectors.
    Parameters
    ----------
    theta : npt.NDArray[np.float64]
        The polar angles in radians.
    phi : npt.NDArray[np.float64]
        The azimuthal angles in radians.
    Returns
    -------
    npt.NDArray[np.float64]
        The Cartesian unit normal vectors corresponding to the input spherical angles.
    """
    st = np.sin(theta)
    return np.stack([np.cos(phi) * st, np.sin(phi) * st, np.cos(theta)], axis=-1)

visibilities_to_mmodes(vis)

Computes m-modes from visibilities using FFT.

Parameters:

Name Type Description Default
vis NDArray[complex128]

The input visibilities with shape (..., n_times).

required

Returns:

Type Description
NDArray[complex128]

The output m-modes with shape (..., n_times), where the last axis corresponds to m-modes ordered from -mmax to +mmax.

Source code in src/serval/utils.py
def visibilities_to_mmodes(
    vis: npt.NDArray[np.complex128],
) -> npt.NDArray[np.complex128]:
    r"""Computes m-modes from visibilities using FFT.
    Parameters
    ----------
    vis : npt.NDArray[np.complex128]
        The input visibilities with shape (..., n_times).
    Returns
    -------
    npt.NDArray[np.complex128]
        The output m-modes with shape (..., n_times), where the last axis corresponds to m-modes
        ordered from -mmax to +mmax.
    """
    return np.fft.fftshift(np.fft.fft(vis, axis=-1), axes=-1) / vis.shape[-1]

Core Gaunt Coefficient Utilities (serval.gaunt.core.py)

gaunt_dot12(alm1, alm2, l3max, sum_m1=False, absm1_lower=None, absm1_upper=None)

Compute a projector for alm3 for the integral of the triple product of spherical harmonics by computing the Gaunt co-efficients in-place and sum-producting over l1, l2, m2 and m1 (if requested).

TODO add formula.

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

Spherical harmonic co-efficients in l1, m1.

required
alm2 NDArray[complex128]

Spherical harmonic co-efficients in l2, m2.

required
l3max int

Maximum l3 to compute co-efficients up to.

required
sum_m1 bool

If True, also sum over m1, otherwise return m1-modes for the m1 range specified, by default False

False
absm1_lower int | None

Lower limit in |m1| to use, by default None, ie. |m1| >= 0.

None
absm1_upper int | None

Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

None

Returns:

Type Description
NDArray[complex128]

m1-mode alm3 projector as a numpy array of shape (Nm1, l3max+1, 2l3max+1) if sum_m1 is False or (l3max+1, 2l3max+1) is sum_m1 is True.

Source code in src/serval/gaunt/core.py
def gaunt_dot12(
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    l3max: int,
    sum_m1: bool = False,
    absm1_lower: int | None = None,
    absm1_upper: int | None = None,
) -> npt.NDArray[np.complex128]:
    """Compute a projector for alm3 for the integral of the triple product
    of spherical harmonics by computing the Gaunt co-efficients in-place
    and sum-producting over l1, l2, m2 and m1 (if requested).

    TODO add formula.

    Parameters
    ----------
    alm1 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l1, m1.
    alm2 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l2, m2.
    l3max : int
        Maximum l3 to compute co-efficients up to.
    sum_m1 : bool, optional
        If True, also sum over m1, otherwise return m1-modes for the m1
        range specified, by default False
    absm1_lower : int | None, optional
        Lower limit in |m1| to use, by default None, ie. |m1| >= 0.
    absm1_upper : int | None, optional
        Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

    Returns
    -------
    npt.NDArray[np.complex128]
        m1-mode alm3 projector as a numpy array of shape (Nm1, l3max+1, 2*l3max+1)
        if sum_m1 is False or (l3max+1, 2*l3max+1) is sum_m1 is True.
    """
    if absm1_lower is None:
        absm1_lower = 0
    if absm1_upper is None:
        absm1_upper = alm1.shape[0]
    nm1 = len(_gaunt.m1_indexing(alm1.shape[0] - 1, absm1_lower, absm1_upper)[0])
    result = np.zeros((nm1, l3max + 1, 2 * l3max + 1), dtype=np.complex128)
    _gaunt.inplace_dot12(alm1, alm2, result, l3max, absm1_lower, absm1_upper)
    if sum_m1:
        return np.sum(result, axis=0)
    else:
        return result

gaunt_dot123(alm1, alm2, alm3, sum_m1=False, absm1_lower=None, absm1_upper=None)

Compute the integral of the triple product of spherical harmonics by computing the Gaunt co-efficients in-place and sum-producting over all harmonic degrees and orders except m1, unless requested.

TODO add formula.

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

Spherical harmonic co-efficients in l1, m1.

required
alm2 NDArray[complex128]

Spherical harmonic co-efficients in l2, m2.

required
alm3 NDArray[complex128]

Spherical harmonic co-efficients in l3, m3.

required
sum_m1 bool

If True, also sum over m1, otherwise return m1-modes for the m1 range specified, by default False

False
absm1_lower int | None

Lower limit in |m1| to use, by default None, ie. |m1| >= 0.

None
absm1_upper int | None

Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

None

Returns:

Type Description
NDArray[complex128] | float

Numpy array, shape (Nm1), of m1-modes or their sum if sum_m1 is True.

Source code in src/serval/gaunt/core.py
def gaunt_dot123(
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    alm3: npt.NDArray[np.complex128],
    sum_m1: bool = False,
    absm1_lower: int | None = None,
    absm1_upper: int | None = None,
) -> npt.NDArray[np.complex128] | float:
    """Compute the integral of the triple product of spherical harmonics
    by computing the Gaunt co-efficients in-place and sum-producting over all
    harmonic degrees and orders except m1, unless requested.

    TODO add formula.

    Parameters
    ----------
    alm1 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l1, m1.
    alm2 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l2, m2.
    alm3 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l3, m3.
    sum_m1 : bool, optional
        If True, also sum over m1, otherwise return m1-modes for the m1
        range specified, by default False
    absm1_lower : int | None, optional
        Lower limit in |m1| to use, by default None, ie. |m1| >= 0.
    absm1_upper : int | None, optional
        Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

    Returns
    -------
    npt.NDArray[np.complex128] | float
        Numpy array, shape (Nm1), of m1-modes or their sum if sum_m1 is True.
    """
    if absm1_lower is None:
        absm1_lower = 0
    if absm1_upper is None:
        absm1_upper = alm1.shape[0]
    nm1 = len(_gaunt.m1_indexing(alm1.shape[0] - 1, absm1_lower, absm1_upper)[0])
    result = np.zeros(nm1, dtype=np.complex128)
    _gaunt.inplace_dot123(alm1, alm2, alm3, result, absm1_lower, absm1_upper)
    if sum_m1:
        return result.sum()
    else:
        return result

gaunt_dot23(alm2, alm3, l1max, sum_m1=False, absm1_lower=None, absm1_upper=None)

Compute a projector for alm1 for the integral of the triple product of spherical harmonics by computing the Gaunt co-efficients in-place and sum-producting over l2, m2, l3, m3 and m1 (if requested).

TODO add formula.

Parameters:

Name Type Description Default
alm2 NDArray[complex128]

Spherical harmonic co-efficients in l2, m2.

required
alm3 NDArray[complex128]

Spherical harmonic co-efficients in l3, m3.

required
l1max int

Maximum l1 to compute co-efficients up to.

required
sum_m1 bool

If True, also sum over m1, otherwise return m1-modes for the m1 range specified, by default False

False
absm1_lower int | None

Lower limit in |m1| to use, by default None, ie. |m1| >= 0.

None
absm1_upper int | None

Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

None

Returns:

Type Description
NDArray[complex128]

m1-mode alm1 projector as a numpy array of shape (Nm1, l1max+1) if sum_m1 is False or (l1max+1,) is sum_m1 is True.

Source code in src/serval/gaunt/core.py
def gaunt_dot23(
    alm2: npt.NDArray[np.complex128],
    alm3: npt.NDArray[np.complex128],
    l1max: int,
    sum_m1: bool = False,
    absm1_lower: int | None = None,
    absm1_upper: int | None = None,
) -> npt.NDArray[np.complex128]:
    """Compute a projector for alm1 for the integral of the triple product
    of spherical harmonics by computing the Gaunt co-efficients in-place
    and sum-producting over l2, m2, l3, m3 and m1 (if requested).

    TODO add formula.

    Parameters
    ----------
    alm2 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l2, m2.
    alm3 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l3, m3.
    l1max : int
        Maximum l1 to compute co-efficients up to.
    sum_m1 : bool, optional
        If True, also sum over m1, otherwise return m1-modes for the m1
        range specified, by default False
    absm1_lower : int | None, optional
        Lower limit in |m1| to use, by default None, ie. |m1| >= 0.
    absm1_upper : int | None, optional
        Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

    Returns
    -------
    npt.NDArray[np.complex128]
        m1-mode alm1 projector as a numpy array of shape (Nm1, l1max+1)
        if sum_m1 is False or (l1max+1,) is sum_m1 is True.
    """
    if absm1_lower is None:
        absm1_lower = 0
    if absm1_upper is None:
        absm1_upper = l1max + 1
    nm1 = len(_gaunt.m1_indexing(l1max, absm1_lower, absm1_upper)[0])
    result = np.zeros((nm1, l1max + 1), dtype=np.complex128)
    _gaunt.inplace_dot23(alm2, alm3, result, l1max, absm1_lower, absm1_upper)
    if sum_m1:
        return np.sum(result, axis=0)
    else:
        return result

gaunts_coo(l1max, l2max, l3max)

Compute a sparse representation of all Gaunt co-efficients up to maximum harmonic degrees l1max, l2max and l3max.

Parameters:

Name Type Description Default
l1max int

Maximum harmonic degree l1.

required
l2max int

Maximum harmonic degree l2.

required
l3max int

Maximum harmonic degree l3.

required

Returns:

Type Description
COO

COO formatted sparse array of the Gaunt co-efficients.

Source code in src/serval/gaunt/core.py
def gaunts_coo(l1max: int, l2max: int, l3max: int) -> sparse.COO:
    """Compute a sparse representation of all Gaunt co-efficients up
    to maximum harmonic degrees l1max, l2max and l3max.

    Parameters
    ----------
    l1max : int
        Maximum harmonic degree l1.
    l2max : int
        Maximum harmonic degree l2.
    l3max : int
        Maximum harmonic degree l3.

    Returns
    -------
    sparse.COO
        COO formatted sparse array of the Gaunt co-efficients.
    """
    coords, data, shape = _gaunt.gaunts_coo(l1max, l2max, l3max)
    coords = np.array(coords)
    coords = coords.view(dtype=coords.dtype[0]).reshape(-1, 6).T
    data = np.array(data)
    return sparse.COO(
        coords=coords,
        data=data,
        shape=shape,
        has_duplicates=False,
        sorted=True,
        fill_value=0.0,
    )

gaunts_l1m1_coo(l1, absm1, l1max, l2max, l3max)

Compute a sparse representation of all Gaunt co-efficients with given harmonic degree l1 and harmonic order |m1|, up to maximum harmonic degrees l1max, l2max and l3max. The shape of the coordinates is the same as for the full gaunt coeffcients (gaunts_coo). The only difference is that the first two indices are l1 and m1, set to the input values.

Parameters:

Name Type Description Default
l1 int

Harmonic degree l1.

required
absm1 int

Harmonic order |m1|.

required
l1max int

Maximum harmonic degree l1.

required
l2max int

Maximum harmonic degree l2.

required
l3max int

Maximum harmonic degree l3.

required

Returns:

Type Description
COO

COO formatted sparse array of the Gaunt co-efficients.

Source code in src/serval/gaunt/core.py
def gaunts_l1m1_coo(
    l1: int, absm1: int, l1max: int, l2max: int, l3max: int
) -> sparse.COO:
    """Compute a sparse representation of all Gaunt co-efficients with given harmonic degree
    l1 and harmonic order |m1|, up to maximum harmonic degrees l1max, l2max and l3max. The shape
    of the coordinates is the same as for the full gaunt coeffcients (gaunts_coo).
    The only difference is that the first two indices are l1 and m1, set to the input values.

    Parameters
    ----------
    l1 : int
        Harmonic degree l1.
    absm1 : int
        Harmonic order |m1|.
    l1max : int
        Maximum harmonic degree l1.
    l2max : int
        Maximum harmonic degree l2.
    l3max : int
        Maximum harmonic degree l3.

    Returns
    -------
    sparse.COO
        COO formatted sparse array of the Gaunt co-efficients.
    """
    # Why is l1max needed...?
    coords, data, shape = _gaunt.gaunts_l1m1_coo(l1, absm1, l1max, l2max, l3max)
    coords = np.array(coords)
    coords = coords.view(dtype=coords.dtype[0]).reshape(-1, 6).T
    data = np.array(data)
    return sparse.COO(
        coords=coords,
        data=data,
        shape=shape,
        has_duplicates=False,
        sorted=True,
        fill_value=0.0,
    )

integrator12_contract3(int12, contract3)

(m1 l3 m3) (l3 m3' m3) -> (m1 l3 m3')

Source code in src/serval/gaunt/core.py
def integrator12_contract3(
    int12: npt.NDArray[np.complex128], contract3: npt.NDArray[np.complex128]
) -> npt.NDArray[np.complex128]:
    """(m1 l3 m3) (l3 m3' m3) -> (m1 l3 m3')"""
    # Check m3' axis smaller than m3 axis
    if int12.ndim == 3:  # Has no frequency axis
        result = np.zeros(
            (int12.shape[0], contract3.shape[0], contract3.shape[1]),
            dtype=np.complex128,
        )
        _contract.integrator12_contract3(int12, contract3, result)
        return result
    elif int12.ndim == 4:  # Has frequency axis
        result = np.zeros(
            (int12.shape[0] * int12.shape[1], contract3.shape[0], contract3.shape[1]),
            dtype=np.complex128,
        )
        _contract.integrator12_contract3(
            int12.reshape((-1,) + int12.shape[2:]), contract3, result
        )
        return result.reshape(
            (int12.shape[0], int12.shape[1], contract3.shape[0], contract3.shape[1])
        )
    else:
        raise ValueError

single_gaunt(l1, l2, l3, m1, m2)

Computes a single Gaunt co-efficient for given harmonic degrees and orders. This uses Wigner-3j family algorithms so it is not efficient for computing many Gaunts co-efficients. Primarily for testing purposes m3 is determined by m1 + m2 + m3 = 0.

Parameters:

Name Type Description Default
l1 int

Harmonic degree l1.

required
l2 int

Harmonic degree l2.

required
l3 int

Harmonic degree l3.

required
m1 int

Harmonic order m1.

required
m2 int

Harmonic order m2.

required

Returns:

Type Description
float

Computed Gaunt co-efficient.

Source code in src/serval/gaunt/core.py
def single_gaunt(l1: int, l2: int, l3: int, m1: int, m2: int) -> float:
    """Computes a single Gaunt co-efficient for given harmonic degrees
    and orders. This uses Wigner-3j family algorithms so it is not
    efficient for computing many Gaunts co-efficients. Primarily for
    testing purposes m3 is determined by m1 + m2 + m3 = 0.

    Parameters
    ----------
    l1 : int
        Harmonic degree l1.
    l2 : int
        Harmonic degree l2.
    l3 : int
        Harmonic degree l3.
    m1 : int
        Harmonic order m1.
    m2 : int
        Harmonic order m2.

    Returns
    -------
    float
        Computed Gaunt co-efficient.
    """
    triangle = abs(l1 - l3) <= l2 <= abs(l1 + l3)
    parity = (l1 + l2 + l3) % 2 != 0
    if (
        not triangle
        or l1 < 0
        or l2 < 0
        or l3 < 0
        or abs(m1) > l1
        or abs(m2) > l2
        or abs(m1 + m2) > l3
        or parity      # parity condition: the sum of the harmonic degrees must be even
    ):
        return 0.0
    return _gaunt.single_gaunt(l1, l2, l3, m1, m2)

wigner_3jj(l2, l3, m2, m3)

Compute the family of non-zero Wigner-3j terms for harmonic degrees l2 and l3 for harmonic degrees m1, m2, m3 = -m1 -m2.

Parameters:

Name Type Description Default
l2 int

Harmonic degree l2.

required
l3 int

Harmonic degree l3.

required
m2 int

Harmonic order m2.

required
m3 int

Harmonic order m3.

required

Returns:

Type Description
tuple[int, NDArray[float64]]

Tuple of first non-zero harmonic degree l1min and array of Wigner-3j values of length l1max - l1min. If there are no non-zero elements of the Wigner-3j family, returns (-1, np.array([])).

Source code in src/serval/gaunt/core.py
def wigner_3jj(
    l2: int, l3: int, m2: int, m3: int
) -> tuple[int, npt.NDArray[np.float64]]:
    """Compute the family of non-zero Wigner-3j terms for harmonic degrees l2 and l3
    for harmonic degrees m1, m2, m3 = -m1 -m2.

    Parameters
    ----------
    l2 : int
        Harmonic degree l2.
    l3 : int
        Harmonic degree l3.
    m2 : int
        Harmonic order m2.
    m3 : int
        Harmonic order m3.

    Returns
    -------
    tuple[int, npt.NDArray[np.float64]]
        Tuple of first non-zero harmonic degree l1min and array of Wigner-3j values of
        length l1max - l1min. If there are no non-zero elements of the Wigner-3j family,
        returns (-1, np.array([])).
    """
    # In case there is an error in the result, pyshtools Wigner3j can also be used.
    # However, it carries extra zeros that need to be removed:
    # Wigner_3j = pysh.utils.Wigner3j(l2, l3, -m2 - m3, m2, m3)[0]
    # lmin = max(abs(l2 - l3), abs(-m2 -m3))
    # lmax = l2 + l3
    # lnum = lmax - lmin + 1
    # return lmin, Wigner_3j[: lnum + 1 :]
    if m2 == 0 and m3 == 0:
        return wigner_3jj_000(l2, l3)
    if l2 < 0 or l3 < 0 or abs(m2) > l2 or abs(m3) > l3:
        return (-1, np.array([]))
    result = _wigner.wigner_3jj_nochecks(l2, l3, m2, m3)
    return result[0], np.array(result[1])

wigner_3jj_000(l2, l3)

Compute the family of non-zero Wigner-3j terms for harmonic degrees l2 and l3 for harmonic degrees m1 = m2 = m3 = 0.

Parameters:

Name Type Description Default
l2 int

Harmonic degree l2.

required
l3 int

Harmonic degree l3.

required

Returns:

Type Description
tuple[int, NDArray[float64]]

Tuple of first non-zero harmonic order l1min and array of Wigner-3j values of length l1max - l1min. If there are no non-zero elements of the Wigner-3j family, returns (-1, np.array([])).

Source code in src/serval/gaunt/core.py
def wigner_3jj_000(l2: int, l3: int) -> tuple[int, npt.NDArray[np.float64]]:
    """Compute the family of non-zero Wigner-3j terms for harmonic degrees l2 and l3
    for harmonic degrees m1 = m2 = m3 = 0.

    Parameters
    ----------
    l2 : int
        Harmonic degree l2.
    l3 : int
        Harmonic degree l3.

    Returns
    -------
    tuple[int, npt.NDArray[np.float64]]
        Tuple of first non-zero harmonic order l1min and array of Wigner-3j values of
        length l1max - l1min. If there are no non-zero elements of the Wigner-3j family,
        returns (-1, np.array([])).
    """
    if l2 < 0 or l3 < 0:
        return (-1, np.array([]))
    result = _wigner.wigner_3jj_000_nochecks(l2, l3)
    return result[0], np.array(result[1])

wigner_3jm(l1, l2, l3, m1)

Compute the family of Wigner-3j terms for harmonic degrees l1, l2 and l3 all non-zero terms with harmonic order m1.

Parameters:

Name Type Description Default
l1 int

Harmonic degree l1.

required
l2 int

Harmonic degree l2.

required
l3 int

Harmonic degree l3.

required
m1 int

Harmonic order m1.

required

Returns:

Type Description
tuple[int, NDArray[float64]]

Tuple of first non-zero harmonic order m2min and array of Wigner-3j values of length l2max - l2min. If there are no non-zero elements of the Wigner-3j family, returns (-1, np.array([])).

Source code in src/serval/gaunt/core.py
def wigner_3jm(
    l1: int, l2: int, l3: int, m1: int
) -> tuple[int, npt.NDArray[np.float64]]:
    """Compute the family of Wigner-3j terms for harmonic degrees l1, l2 and l3
    all non-zero terms with harmonic order m1.

    Parameters
    ----------
    l1 : int
        Harmonic degree l1.
    l2 : int
        Harmonic degree l2.
    l3 : int
        Harmonic degree l3.
    m1 : int
        Harmonic order m1.

    Returns
    -------
    tuple[int, npt.NDArray[np.float64]]
        Tuple of first non-zero harmonic order m2min and array of Wigner-3j values of
        length l2max - l2min. If there are no non-zero elements of the Wigner-3j family,
        returns (-1, np.array([])).
    """
    triangle = abs(l1 - l3) <= l2 <= abs(l1 + l3)
    if (
        not triangle
        or l1 < 0
        or l2 < 0
        or l3 < 0
        or abs(m1) > l1
        or not (abs(l1 - l3) < l2 < l1 + l3)
    ):
        return (-1, np.array([]))
    result = _wigner.wigner_3jm_nochecks(l1, l2, l3, m1)
    return result[0], np.array(result[1])