Skip to content

API

Under Construction

Core Module (serval.core.py)

PowerBeamVisProjector

Bases: VisProjector

Parameters:

Name Type Description Default
latitude float
required
longitude float
required
baseline_enu tuple[float, float, float]
required
power_beam_lmax int
required
frequencies_MHz ndarray
required
baseline_lmax int | None
None
sky_lmax int | None
None
sky_absm_limits list[int | None]
[0, None]
generate_gaunt_cache_on_init bool
False
generate_baseline_cache_on_init bool
False
generate_pointing_contractor_on_init bool
False
batch_parallel_mode Literal['channel-opt', 'channel', 'gaunt', 'gaunt-opt']
'gaunt-opt'
pointing_contract bool
False
pointing_altitude float | None
None
pointing_azimuth float | None
None
pointing_boresight float | None
None
pointed_beam_mmax int | None
None
aberrate_baseline bool
False

Attributes:

Name Type Description
pointing_contractor ndarray | None
Source code in src/serval/core.py
@define(eq=False)
class PowerBeamVisProjector(VisProjector):
    latitude: float
    longitude: float
    baseline_enu: tuple[float, float, float]
    power_beam_lmax: int
    frequencies_MHz: np.ndarray

    baseline_lmax: int | None = None
    sky_lmax: int | None = None
    sky_absm_limits: list[int | None] = field(factory=lambda: [0, None])
    generate_gaunt_cache_on_init: bool = False
    generate_baseline_cache_on_init: bool = False
    generate_pointing_contractor_on_init: bool = False
    batch_parallel_mode: Literal["channel-opt", "channel", "gaunt", "gaunt-opt"] = "gaunt-opt"
    pointing_contract: bool = False
    pointing_altitude: float | None = None
    pointing_azimuth: float | None = None
    pointing_boresight: float | None = None
    pointed_beam_mmax: int | None = None
    aberrate_baseline: bool = False

    pointing_contractor: np.ndarray | None = field(init=False, default=None)
    _baseline_cache_T: np.ndarray | None = field(init=False, default=None)

    _to_attrs: ClassVar[list[str]] = [
        "latitude",
        "longitude",
        "power_beam_lmax",
        "baseline_lmax",
        "sky_lmax",
        "baseline_enu",
        "aberrate_baseline",
        "pointing_contract",
        "pointing_altitude",
        "pointing_azimuth",
        "pointing_boresight",
        "pointed_beam_mmax",
    ]
    _to_zarr_store: ClassVar[list[str]] = ["frequencies_MHz"]
    _opt_tag: ClassVar[str] = "opt12"

    def __attrs_post_init__(self):
        if self.generate_baseline_cache_on_init:
            self.generate_baseline_cache()
        lmax, self.baseline_mmax = baseline_bandlimits(
            self.baseline_enu,
            self.frequencies_MHz.max(),
            self.latitude,
            self.longitude,
        )
        if self.baseline_lmax is None:
            self.baseline_lmax = lmax
        self.set_bandlimits_from_baseline()
        if self.generate_gaunt_cache_on_init:
            self.generate_gaunt_cache()
        else:
            self.update_integrator()
        if self.pointing_contract:
            if self.pointed_beam_mmax is None:
                raise ValueError(
                    "Cannot determine pointing_contractor with the inputs given, "
                    "because pointed_beam_mmax is None."
                )
            if self.pointed_beam_mmax > self.power_beam_lmax:
                raise ValueError(
                    "pointed_beam_mmax cannot be larger than power_beam_lmax."
                )
            if self.generate_pointing_contractor_on_init:
                self.generate_pointing_contractor()

    def generate_pointing_contractor(self):
        self.pointing_contractor = wigner_d(
            self.power_beam_lmax,
            self.pointed_beam_mmax,
            tirs_to_pointing(
                latitude=self.latitude,
                longitude=self.longitude,
                altitude=self.pointing_altitude,
                azimuth=self.pointing_azimuth,
                boresight=self.pointing_boresight,
            ).inv(),
        )

    def generate_baseline_cache(self):
        self.baseline_cache = tirs_baseline_alm(
            enu=self.baseline_enu,
            frequencies_MHz=self.frequencies_MHz,
            latitude=self.latitude,
            longitude=self.longitude,
            lmax=self.baseline_lmax,
            aberrate_baseline=self.aberrate_baseline,
        )
        self.baseline_lmax = self.baseline_cache.shape[1] - 1
        self._baseline_cache_T = transpose_alm(self.baseline_cache, (2, 1, 0))

    def set_bandlimits_from_baseline(self):
        if self.sky_lmax is None:
            # No warning here, as it's the default case.
            self.sky_lmax = self.baseline_lmax + self.power_beam_lmax
        elif self.sky_lmax > self.baseline_lmax + self.power_beam_lmax:
            self.sky_lmax = self.baseline_lmax + self.power_beam_lmax
            warnings.warn(
                "The value of sky_lmax is too large. It is changed to the sum of baseline_lmax and "
                "power_beam_lmax which is " + str(self.sky_lmax) + ".",
                stacklevel=2,  # points to caller
            )
        elif self.sky_lmax < self.baseline_lmax + self.power_beam_lmax:
            warnings.warn(
                "The value of sky_lmax is smaller than the optimal value given the "
                "baseline and beam which is "
                + str(self.baseline_lmax + self.power_beam_lmax)
                + ". Consider changing it.",
                stacklevel=2,
            )  # points to caller
        if self.sky_absm_limits[1] is None:
            self.sky_absm_limits = (
                0,
                min(self.baseline_mmax + self.power_beam_lmax, self.sky_lmax) + 1,
            )
            # No warning here as it's a very common case to want this auto-generated.
        else:
            if self.sky_absm_limits[1] > self.sky_lmax + 1:
                self.sky_absm_limits = list(self.sky_absm_limits)
                self.sky_absm_limits[1] = self.sky_lmax + 1
                warnings.warn(
                    "The upper limit for the mmodes of the sky cannot be larger than "
                    "sky_lmax + 1. It is changed to "
                    + str(self.sky_absm_limits[1])
                    + ".",
                    stacklevel=2,
                )  # points to caller
            if self.sky_absm_limits[1] - 1 != self.baseline_mmax + self.power_beam_lmax:
                warnings.warn(
                    "The upper limit for the mmodes of the sky does not correspond to the "
                    "optimal value given the baseline and beam which is "
                    + str(self.baseline_mmax + self.power_beam_lmax)
                    + ". Consider changing it.",
                    stacklevel=2,
                )  # points to caller
            self.sky_absm_limits = tuple(self.sky_absm_limits)

    def beam_linear_mmode_generator(
        self,
        sky_alms,
        release_gaunt_cache: bool = True,
        sky_alms_transposed: npt.NDArray[np.complex128] | None = None,
    ):
        sky_alms = extend_dimensions_if_one_batch(sky_alms, 3)
        if sky_alms.shape[0] != len(self.frequencies_MHz):
            raise ValueError(
                "The sky_alms coefficients do not correspond to the number of frequency bins."
            )
        if sky_alms.shape[1] != self.sky_lmax + 1:
            raise ValueError(
                "The sky_alms coefficients do not correspond to the expected sky_lmax "
                "(baseline_lmax + power_beam_lmax)."
            )
        if sky_alms.shape[2] != 2 * self.sky_lmax + 1:
            raise ValueError(
                "The sky_alms coefficients do not correspond to the full harmonic orders "
                "of sky_lmax (baseline_lmax + power_beam_lmax) -> (ms = 2 * sky_lmax + 1)."
            )
        if self.baseline_cache is not None:
            self.generate_baseline_cache()
        if self.baseline_cache is None:
            raise ValueError("baseline_cache is None. It must be generated beforehand.")
        if self.pointing_contract and self.pointing_contractor is None:
            self.generate_pointing_contractor()
        alm2_transposed = getattr(self, "_baseline_cache_T", None)
        self.triple_sh_integrator.generate_integrator_cache_12(
            sky_alms,
            self.baseline_cache,
            contract3=self.pointing_contractor,
            release_gaunt_cache=release_gaunt_cache,
            batch_parallel_mode=self.batch_parallel_mode,
            alm1_transposed=sky_alms_transposed,
            alm2_transposed=alm2_transposed,
        )
        self.integrator_cache = self.triple_sh_integrator.linear_integrator_cache_12
        return self.integrator_cache

    def mmodes(self, beam_alms, sky_alms=None):
        beam_alms = extend_dimensions_if_one_batch(beam_alms, 3)
        if beam_alms.shape[0] != len(self.frequencies_MHz):
            raise ValueError(
                "The beam_alms coefficients do not correspond to the number of frequency bins."
            )
        if beam_alms.shape[1] != self.power_beam_lmax + 1:
            raise ValueError(
                "The beam_alms coefficients do not correspond to the inputted power_beam_lmax."
            )
        if self.pointing_contract:
            if beam_alms.shape[2] != 2 * self.pointed_beam_mmax + 1:
                raise ValueError(
                    "The beam_alms coefficients do not correspond to the expected "
                    "pointed_beam_mmax."
                )
        else:
            if beam_alms.shape[2] != 2 * self.power_beam_lmax + 1:
                raise ValueError(
                    "The beam_alms coefficients do not correspond to the full harmonic orders "
                    "of power_beam_lmax -> (ms = 2 * power_beam_lmax + 1)."
                )
        if sky_alms is None and self.integrator_cache is None:
            raise ValueError("Must specify either sky_alms or integrator cache.")
        else:
            if sky_alms is None:
                integrator_cache = self.integrator_cache
            else:
                integrator_cache = self.beam_linear_mmode_generator(sky_alms=sky_alms)
            integrator_cache = extend_dimensions_if_one_batch(integrator_cache, 4)
            return self.triple_sh_integrator.batch_gaunt_integrate_cached_12(
                beam_alms, integrator_cache=integrator_cache
            )

    def visibilities(self, beam_alms, sky_alms=None, return_mmodes=False):
        mmodes = self.mmodes(beam_alms=beam_alms, sky_alms=sky_alms)
        vis = mmodes_to_visibilities(mmodes, self.sky_lmax, ms=self.sky_m_values)
        if return_mmodes:
            return vis, mmodes
        else:
            return vis

    def setup_zarr_store(
        self,
        store_location: str | Path,
        frequency_chunksize: int | None = None,
        group_path: str = r"/",
        store_metadata_attrs: dict | None = None,
    ):
        if self.pointing_contract:
            num_beam_ms = 2 * self.pointed_beam_mmax + 1
        else:
            num_beam_ms = 2 * self.power_beam_lmax + 1
        integrator_cache_shape = (
            self.frequencies_MHz.size,
            2 * self.sky_lmax + 1,
            self.power_beam_lmax + 1,
            num_beam_ms,
        )
        integrator_cache_chunks = (
            (
                frequency_chunksize
                if frequency_chunksize is not None
                else self.frequencies_MHz.size
            ),
            1,  # per m-chunking for parallel writes
            self.power_beam_lmax + 1,
            num_beam_ms,
        )
        self._setup_zarr_store(
            store_location=store_location,
            group_path=group_path,
            integrator_cache_shape=integrator_cache_shape,
            integrator_cache_chunks=integrator_cache_chunks,
            store_metadata_attrs=store_metadata_attrs,
        )

PowerFromVoltageBeams

Parameters:

Name Type Description Default
voltage_beam_lmax int
required
voltage_beam_mmax int | None
None
power_beam_lmax int | None
None
power_beam_mmax int | None
None
generate_cache_on_init bool
True

Attributes:

Name Type Description
triple_sh_integrator TripleSHIntegrator
Source code in src/serval/core.py
@define(eq=False)
class PowerFromVoltageBeams:
    voltage_beam_lmax: int
    voltage_beam_mmax: int | None = None
    power_beam_lmax: int | None = None
    power_beam_mmax: int | None = None

    triple_sh_integrator: TripleSHIntegrator = field(init=False)
    generate_cache_on_init: bool = True


    def __attrs_post_init__(self):
        if self.voltage_beam_mmax is None:
            self.voltage_beam_mmax = self.voltage_beam_lmax
        elif self.voltage_beam_mmax > self.voltage_beam_lmax:
            self.voltage_beam_mmax = self.voltage_beam_lmax
            warnings.warn(
                "The value of the voltage beam mmax cannot be larger than the voltage beam lmax. "
                "It is changed to " + str(self.voltage_beam_lmax) + ".",
                stacklevel=2,  # points to caller
            )

        if self.power_beam_lmax is None:
            self.power_beam_lmax = 2 * self.voltage_beam_lmax
        elif not self.voltage_beam_lmax <= self.power_beam_lmax <= 2 * self.voltage_beam_lmax:
            warnings.warn(
                "The value of the power beam lmax should be in the range "
                "[min(VBi_lmax, VBj_lmax), VBi_lmax + VBj_lmax].",
                stacklevel=2,
            )

        if self.power_beam_mmax is None:
            self.power_beam_mmax = 2 * self.voltage_beam_mmax
        elif self.power_beam_mmax > self.power_beam_lmax:
            self.power_beam_mmax = self.power_beam_lmax
            warnings.warn(
                "The value of the power beam mmax cannot be larger than the power beam lmax. "
                "It is changed to " + str(self.power_beam_lmax) + ".",
                stacklevel=2,  # points to caller
            )
        if not self.voltage_beam_mmax <= self.power_beam_mmax <= 2 * self.voltage_beam_mmax:
            warnings.warn(
                "The value of the power beam mmax should be in the range "
                "[min(VBi_mmax, VBj_mmax), VBi_mmax + VBj_mmax]. ",
                stacklevel=2,  # points to caller
            )

        if self.generate_cache_on_init:
            self.generate_cache()
        else:
            self.update_integrator()

    def update_integrator(self):
        self.triple_sh_integrator = TripleSHIntegrator(
            l1max=self.voltage_beam_lmax,
            l2max=self.voltage_beam_lmax,
            l3max=self.power_beam_lmax,
            absm1_limits=(0, self.voltage_beam_mmax + 1),
            generate_cache_on_init=False,
        )

    def generate_cache(self, cache_type_tag: Literal["opt12", "generic"] = "opt12"):
        self.update_integrator()
        self.triple_sh_integrator.generate_gaunt_cache(cache_type_tag)

    def grid_batch_power_beam_alm(self, voltage_beam_alm_i, voltage_beam_alm_j):
        voltage_beam_alm_i = extend_dimensions_if_one_batch(voltage_beam_alm_i, 3)
        voltage_beam_alm_j = extend_dimensions_if_one_batch(voltage_beam_alm_j, 3)
        vb_i = set_bandlimits(
            voltage_beam_alm_i, lmax=self.power_beam_lmax, mmax=self.power_beam_lmax
        )
        vb_j = set_bandlimits(
            voltage_beam_alm_j, lmax=self.power_beam_lmax, mmax=self.power_beam_lmax
        )
        grids_i = batch_array_synthesis(vb_i, lmax=self.power_beam_lmax)
        grids_j = batch_array_synthesis(vb_j, lmax=self.power_beam_lmax)
        return set_bandlimits(
            batch_array_analysis(grids_i * np.conj(grids_j), lmax=self.power_beam_lmax),
            lmax=self.power_beam_lmax,
            mmax=self.power_beam_mmax,
        )

    def batch_power_beam_alm(
        self,
        voltage_beam_alm_i,
        voltage_beam_alm_j,
        batch_parallel_mode: Literal[
            "channel-opt", "channel", "gaunt", "gaunt-opt"
        ] = "channel-opt",
    ):
        # Conjugate j
        m2s = np.linspace(
            -self.voltage_beam_mmax,
            self.voltage_beam_mmax,
            2 * self.voltage_beam_mmax + 1,
        )
        ndim = voltage_beam_alm_j.ndim
        voltage_beam_alm_j = (
            voltage_beam_alm_j.conj()[..., ::-1]
            * (-1) ** m2s[(None,) * (ndim - 1) + (slice(None),)]
        )
        # Make m's full, eventually will not be necessary
        dm_volt = self.voltage_beam_lmax - self.voltage_beam_mmax
        if dm_volt > 0:
            voltage_beam_alm_i = np.pad(
                voltage_beam_alm_i, ((0, 0),) * (ndim - 1) + ((dm_volt, dm_volt),)
            )
            voltage_beam_alm_j = np.pad(
                voltage_beam_alm_j, ((0, 0),) * (ndim - 1) + ((dm_volt, dm_volt),)
            )

        self.triple_sh_integrator.generate_integrator_cache_12(
            voltage_beam_alm_i, voltage_beam_alm_j, batch_parallel_mode=batch_parallel_mode
        )
        power_beam_alm_unsummed = self.triple_sh_integrator.linear_integrator_cache_12
        if not np.isfinite(power_beam_alm_unsummed).all():
            raise ValueError("Power beam alm contains NaN values.")

        m3s = np.linspace(
            -self.power_beam_lmax, self.power_beam_lmax, 2 * self.power_beam_lmax + 1
        )
        ndim = power_beam_alm_unsummed.ndim
        # sum over the voltage beam ms, for 4 dims axis=1 for 3 dims axis=0 (no frequency axis)
        power_beam_alm = (
            power_beam_alm_unsummed.sum(axis=ndim - 3)[..., ::-1]
            * (-1) ** m3s[(None,) * (ndim - 2) + (slice(None),)]
        )

        # Keep the coefficients from -power_beam_mmax : power_beam_mmax
        power_beam_alm = power_beam_alm[
            ...,
            self.power_beam_lmax - self.power_beam_mmax : self.power_beam_lmax
            + self.power_beam_mmax + 1,
        ]
        return power_beam_alm

    def clear_cache(self):
        self.triple_sh_integrator.clear_gaunt_cache()

SingleVisibilityGenerator

Parameters:

Name Type Description Default
latitude float
required
longitude float
required
baseline_enu tuple[float, float, float]
required
frequency_MHz float
required
power_beam_alm ndarray[tuple[Any, ...], dtype[complex128]]
required
sky_alm ndarray[tuple[Any, ...], dtype[complex128]]
required
method Literal['gaunt', 'gaunt_sparse', 'grid']
'gaunt'
baseline_lmax int | None
None
aberrate_baseline bool
False
sky_absm_limits tuple[int, int | None]
(0, None)
baseline_alm None | ndarray[tuple[Any, ...], dtype[complex128]]
None

Attributes:

Name Type Description
sky_lmax int
power_beam_lmax int
triple_sh_integrator TripleSHIntegrator
Source code in src/serval/core.py
@define(eq=False)
class SingleVisibilityGenerator:
    latitude: float
    longitude: float
    baseline_enu: tuple[float, float, float]
    frequency_MHz: float
    power_beam_alm: npt.NDArray[np.complex128]
    sky_alm: npt.NDArray[np.complex128]

    method: Literal["gaunt", "gaunt_sparse", "grid"] = "gaunt"
    baseline_lmax: int | None = None
    aberrate_baseline: bool = field(default=False, kw_only=True)

    sky_absm_limits: tuple[int, int | None] = (0, None)

    baseline_alm: None | npt.NDArray[np.complex128] = None

    sky_lmax: int = field(init=False)
    power_beam_lmax: int = field(init=False)
    triple_sh_integrator: TripleSHIntegrator = field(init=False)

    def generate_baseline_cache(self):
        self.baseline_alm = tirs_baseline_alm(
            enu=self.baseline_enu,
            frequencies_MHz=np.atleast_1d(self.frequency_MHz)[None, :],
            latitude=self.latitude,
            longitude=self.longitude,
            lmax=self.baseline_lmax,
            aberrate_baseline=self.aberrate_baseline,
        )[
            0, 0, ...
        ]  # TODO Figure out broadcasting here... The shape of the matrix:
        # [None from np.atleast_1s, len(frequencies_MHz)=1, lmax + 1, 2 * lmax + 1]
        self.baseline_lmax = self.baseline_alm.shape[0] - 1

    def __attrs_post_init__(self):
        if not isinstance(self.frequency_MHz, float | int):
            raise TypeError("frequency_MHz must be a float or integer")
        if self.baseline_alm is None:
            self.generate_baseline_cache()
        elif self.baseline_lmax is None:
            self.baseline_lmax = self.baseline_alm.shape[0] - 1
        else:
            if self.baseline_lmax != self.baseline_alm.shape[0] - 1:
                raise ValueError(
                    "The defined harmonic degree baseline_lmax does not "
                    "correspond to the defined baseline_alm coefficients"
                )
        self.sky_lmax = self.sky_alm.shape[0] - 1
        self.power_beam_lmax = self.power_beam_alm.shape[0] - 1
        self.triple_sh_integrator = TripleSHIntegrator(
            l1max=self.sky_lmax,
            l2max=self.baseline_lmax,
            l3max=self.power_beam_lmax,
            absm1_limits=self.sky_absm_limits,
            generate_cache_on_init=False,
        )

    @property
    def sky_m_values(self):
        return self.triple_sh_integrator.m1_values

    @property
    def sky_m_index_map(self):
        return self.triple_sh_integrator.m1_index_map

    @property
    def num_sky_ms(self):
        return len(self.sky_m_values)

    def mmodes(
        self,
    ):
        if self.method == "grid":
            return self.triple_sh_integrator.grid_integrate(
                alm1=self.sky_alm, alm2=self.baseline_alm, alm3=self.power_beam_alm
            )
        elif self.method == "gaunt_sparse":
            return self.triple_sh_integrator.sparse_gaunt_einsum(
                sparse_alm1=sparse.COO.from_numpy(self.sky_alm),
                sparse_alm2=sparse.COO.from_numpy(self.baseline_alm),
                sparse_alm3=sparse.COO.from_numpy(self.power_beam_alm),
            )
        elif self.method == "gaunt":
            return self.triple_sh_integrator.gaunt_integrate(
                alm1=self.sky_alm, alm2=self.baseline_alm, alm3=self.power_beam_alm
            )

    def visibilities(self, return_mmodes=False):
        mmodes = self.mmodes()
        vis = mmodes_to_visibilities(mmodes, m1max=self.sky_lmax, ms=self.sky_m_values)
        if return_mmodes:
            return vis, mmodes
        else:
            return vis

SkyVisProjector

Bases: VisProjector

Parameters:

Name Type Description Default
latitude float
required
longitude float
required
baseline_enu tuple[float, float, float]
required
sky_lmax int
required
frequencies_MHz ndarray
required
baseline_lmax int | None
None
power_beam_lmax int | None
None
sky_absm_limits list[int | None]
[0, None]
generate_gaunt_cache_on_init bool
False
generate_baseline_cache_on_init bool
False
batch_parallel_mode Literal['channel', 'gaunt']
'channel'
aberrate_baseline bool
False
Source code in src/serval/core.py
@define(eq=False)
class SkyVisProjector(VisProjector):
    latitude: float
    longitude: float
    baseline_enu: tuple[float, float, float]
    sky_lmax: int
    frequencies_MHz: np.ndarray

    baseline_lmax: int | None = None
    power_beam_lmax: int | None = None
    sky_absm_limits: list[int | None] = field(factory=lambda: [0, None])
    generate_gaunt_cache_on_init: bool = False
    generate_baseline_cache_on_init: bool = False
    batch_parallel_mode: Literal["channel", "gaunt"] = "channel"
    aberrate_baseline: bool = False

    _to_attrs = [
        "latitude",
        "longitude",
        "sky_lmax",
        "power_beam_lmax",
        "baseline_lmax",
        "baseline_enu",
        "aberrate_baseline",
    ]
    _to_zarr_store = ["frequencies_MHz"]

    def __attrs_post_init__(self):
        if self.generate_baseline_cache_on_init:
            self.generate_baseline_cache()
        if self.power_beam_lmax is None:
            if self.baseline_lmax is not None:
                if self.power_beam_lmax != self.sky_lmax - self.baseline_lmax:
                    warnings.warn(
                        "The value of power_beam_lmax is changed to the difference "
                        "of baseline_lmax from sky_lmax.",
                        stacklevel=2,  # points to caller
                    )
                    self.power_beam_lmax = self.sky_lmax - self.baseline_lmax
            else:
                raise ValueError(
                    "Cannot determine power_beam_lmax with inputs given.\n"
                    "Either baseline_cache needs to be computed on input, baseline_lmax must be "
                    "specified or power_beam_lmax must be specified."
                )
        self.triple_sh_integrator = TripleSHIntegrator(
            l1max=self.sky_lmax,
            l2max=self.baseline_lmax,
            l3max=self.power_beam_lmax,
            absm1_limits=self.sky_absm_limits,
            generate_cache_on_init=False,
        )
        if self.generate_gaunt_cache_on_init:
            self.generate_gaunt_cache()

    def generate_baseline_cache(self):
        self.baseline_cache = tirs_baseline_alm(
            enu=self.baseline_enu,
            frequencies_MHz=self.frequencies_MHz,
            latitude=self.latitude,
            longitude=self.longitude,
            lmax=self.baseline_lmax,
            aberrate_baseline=self.aberrate_baseline,
        )
        # Can assert this is the same as input if not none.
        self.baseline_lmax = self.baseline_cache.shape[1] - 1
        # Only changed here:
        if self.power_beam_lmax is None:
            warnings.warn(
                "The value of power_beam_lmax is changed to the difference "
                "of baseline_lmax from sky_lmax.",
                stacklevel=2,  # points to caller
            )
            self.power_beam_lmax = self.sky_lmax - self.baseline_lmax

    def sky_linear_mmode_generator(self, beam_alms):
        beam_alms = extend_dimensions_if_one_batch(beam_alms, 3)
        if beam_alms.shape[0] != len(self.frequencies_MHz):
            raise ValueError(
                "The beam_alms coefficients do not correspond to the number of frequency bins."
            )
        if (
            beam_alms.shape[1] != self.power_beam_lmax + 1
            or beam_alms.shape[2] != 2 * self.power_beam_lmax + 1
        ):
            raise ValueError(
                "The beam_alms coefficients do not correspond to the expected power_beam_lmax "
                "(sky_lmax - baseline_lmax). Either number ls or ms is incorrect."
            )
        self.triple_sh_integrator.generate_integrator_cache_23(
            alm2=self.baseline_cache,
            alm3=beam_alms,
            batch_parallel_mode=self.batch_parallel_mode,
        )
        self.integrator_cache = self.triple_sh_integrator.linear_integrator_cache_23
        return self.integrator_cache

    def mmodes(self, sky_alms, beam_alms=None):
        sky_alms = extend_dimensions_if_one_batch(sky_alms, 3)
        if sky_alms.shape[0] != len(self.frequencies_MHz):
            raise ValueError(
                "The sky_alms coefficients do not correspond to the number of frequency bins."
            )
        if sky_alms.shape[1] != self.sky_lmax + 1:
            raise ValueError(
                "The sky_alms coefficients do not correspond to the inputted sky_lmax."
            )
        if sky_alms.shape[2] != 2 * self.sky_lmax + 1:
            raise ValueError(
                "The sky_alms coefficients do not correspond to the expected harmonic orders "
                "for the inputted sky_lmax (ms = 2 * sky_lmax + 1)."
            )
        cut_sky_alms = sky_alms[
            ..., list(self.sky_m_index_map.keys())
        ]  # Need to select only the valid ms

        if beam_alms is None and self.integrator_cache is None:
            raise ValueError("Must specify either beam_alms or integrator cache.")
        else:
            if beam_alms is None:
                integrator_cache = self.integrator_cache
            else:
                integrator_cache = self.sky_linear_mmode_generator(beam_alms=beam_alms)
            integrator_cache = extend_dimensions_if_one_batch(integrator_cache, 3)
            return self.triple_sh_integrator.batch_gaunt_integrate_cached_23(
                alm1=cut_sky_alms, integrator_cache=integrator_cache
            )

    def visibilities(
        self,
        sky_alms,
        beam_alms=None,
        return_mmodes=False,
    ):
        mmodes = self.mmodes(beam_alms=beam_alms, sky_alms=sky_alms)
        vis = mmodes_to_visibilities(mmodes, self.sky_lmax, ms=self.sky_m_values)
        if return_mmodes:
            return vis, mmodes
        else:
            return vis

    def setup_zarr_store(
        self,
        store_location: str | Path,
        frequency_chunksize: int | None = None,
        group_path: str = r"/",
        store_metadata_attrs: dict | None = None,
    ):
        integrator_cache_shape = (
            self.frequencies_MHz.size,
            2 * self.sky_lmax + 1,
            self.sky_lmax + 1,
        )
        integrator_cache_chunks = (
            (
                frequency_chunksize
                if frequency_chunksize is not None
                else self.frequencies_MHz.size
            ),
            1,  # per m-chunking for parallel writes
            self.power_beam_lmax + 1,
        )
        self._setup_zarr_store(
            store_location=store_location,
            group_path=group_path,
            integrator_cache_shape=integrator_cache_shape,
            integrator_cache_chunks=integrator_cache_chunks,
            store_metadata_attrs=store_metadata_attrs,
        )

TripleSHIntegrator

A class for performing integrals of the products of three functions on the sphere expressed as their spherical harmonic transformations.

This class provides methods for this integration performed with a variety of techniques using Gaunt coefficients and grid-based approaches. It supports caching for efficient repeated computations when changing only one of the functions. Additionally it allows for computing the Fourier transform of these integrals in an azimuthal rotation angle of one of the functions (always the 1st) around the others, ie. the \(m\)-modes.

That is,

\[ \begin{align} \mathcal{V} &= \int\diff\Omega~f_1(\dirvec)f_2(\dirvec)f_3(\dirvec) \\ &= \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_1 m_2 m_3}} a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3} \int\diff\Omega~Y_{\ell_1 m_1}(\dirvec) Y_{\ell_2 m_2}(\dirvec) Y_{\ell_3 m_3}(\dirvec), \end{align} \]

where

\[ f_1(\dirvec) = \sum_{\ell_1=0}^{l_{1,\mathrm{max}}} \sum_{m_1=-\ell_1}^{\ell_1} a_{\ell_1 m_1} Y_{\ell_1 m_1}(\dirvec) \]

etc.

For the \(m\)-modes:

\[ \begin{align} \mathcal{V}_{m_1} &= \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_2 m_3}} a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3} \int\diff\Omega~Y_{\ell_1 m_1}(\dirvec) Y_{\ell_2 m_2}(\dirvec) Y_{\ell_3 m_3}(\dirvec), \\ &= \int \frac{\diff\phi}{2\pi} \exp[-im_1\phi] \int\diff\Omega~f_1( \matr{\mathcal{R}}_Z(-\phi)\dirvec)f_2(\dirvec)f_3(\dirvec) \end{align} \]

where \(\matr{\mathcal{R}}_Z(-\phi)\) is a passive basis rotation about the polar axis of the sphere.

Notes

The index of the functions only matters in differentiating from the others the first function/set of coefficients as the integration can be split up in \(m_1\) values. Additionally integrals can be performed rotating this function azimuthally around the others. That is, for the purposes of this code, this represents the sky.

Parameters:

Name Type Description Default
l1max int

Maximum spherical harmonic degree for the first set of coefficients.

required
l2max int

Maximum spherical harmonic degree for the second set of coefficients.

required
l3max int

Maximum spherical harmonic degree for the third set of coefficients.

required
absm1_limits tuple[int, int | None]

Inclusive lower and exclusive upper limits for the absolute value of the m1 index. Defaults to (0, None), ie, all m1 values.

(0, None)
generate_cache_on_init bool

Whether to generate the Gaunt coefficient cache during initialization, by default False.

False

Attributes:

Name Type Description
m1_values list[int]

List of m1 values that integrations are performed for.

m1_index_map dict[int, int]

Mapping of m1 values to their indices in a completed set of m1 values.

linear_integrator_cache_12 ndarray[tuple[Any, ...], dtype[complex128]] | None
linear_integrator_cache_23 ndarray[tuple[Any, ...], dtype[complex128]] | None
gaunt_cache CachingGauntContractor | CachingGauntContractorOpt12 | None
m1_global_index list[int]

Methods:

Name Description
generate_gaunt_cache

Generates the Gaunt coefficient cache.

clear_gaunt_cache

Clears the Gaunt coefficient cache.

sparse_gaunt_einsum

Performs sparse tensor contraction using computed Gaunt coefficients.

grid_integrate

Performs integration using a grid-based approach.

gaunt_integrate

Performs integration using a direct sum over precomputed Gaunt coefficients.

linear_gaunt_integrator_12

Computes a linear integrator for fixed first and second sets of coefficients.

generate_integrator_cache_12

batch_parallel_mode="channel") Generates and caches the linear integrator for fixed first and second sets of coefficients.

batch_gaunt_integrate_cached_12

Uses a integrator cache for fixed first and second sets of coefficients to compute integrals over a batch of the third set of coefficients.

linear_gaunt_integrator_23

Computes a linear integrator for fixed second and third sets of coefficients.

generate_integrator_cache_23

Generates and caches the linear integrator for fixed second and third sets of coefficients.

batch_gaunt_integrate_cached_12

Uses a integrator cache for fixed second and third sets of coefficients to compute integrals over a bacth of the first set of coefficients.

Source code in src/serval/core.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
@define(eq=False)
class TripleSHIntegrator:
    r"""
    A class for performing integrals of the products of three functions on the
    sphere expressed as their spherical harmonic transformations.

    This class provides methods for this integration performed with a variety
    of techniques using Gaunt coefficients and grid-based approaches. It
    supports caching for efficient repeated computations when changing only
    one of the functions. Additionally it allows for computing the Fourier
    transform of these integrals in an azimuthal rotation angle of one of
    the functions (always the 1st) around the others, ie. the $m$-modes.

    That is,

    $$
    \begin{align}
    \mathcal{V} &= \int\diff\Omega~f_1(\dirvec)f_2(\dirvec)f_3(\dirvec) \\
                &= \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_1 m_2 m_3}}
                  a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3}
                  \int\diff\Omega~Y_{\ell_1 m_1}(\dirvec)
                  Y_{\ell_2 m_2}(\dirvec) Y_{\ell_3 m_3}(\dirvec),
    \end{align}
    $$

    where

    $$
    f_1(\dirvec) = \sum_{\ell_1=0}^{l_{1,\mathrm{max}}}
                   \sum_{m_1=-\ell_1}^{\ell_1} a_{\ell_1 m_1} Y_{\ell_1 m_1}(\dirvec)
    $$

    etc.

    For the $m$-modes:

    $$
    \begin{align}
    \mathcal{V}_{m_1} &= \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_2 m_3}}
                        a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3}
                        \int\diff\Omega~Y_{\ell_1 m_1}(\dirvec)
                        Y_{\ell_2 m_2}(\dirvec) Y_{\ell_3 m_3}(\dirvec), \\
                      &=
                        \int \frac{\diff\phi}{2\pi} \exp[-im_1\phi] \int\diff\Omega~f_1(
                        \matr{\mathcal{R}}_Z(-\phi)\dirvec)f_2(\dirvec)f_3(\dirvec)
    \end{align}
    $$

    where $\matr{\mathcal{R}}_Z(-\phi)$ is a passive basis rotation about the polar axis
    of the sphere.

    Notes
    -----
    The index of the functions only matters in differentiating from the others the first
    function/set of coefficients as the integration can be split up in $m_1$ values.
    Additionally integrals can be performed rotating this function azimuthally around
    the others. That is, for the purposes of this code, this represents the sky.


    Parameters
    ----------
    l1max : int
        Maximum spherical harmonic degree for the first set of coefficients.
    l2max : int
        Maximum spherical harmonic degree for the second set of coefficients.
    l3max : int
        Maximum spherical harmonic degree for the third set of coefficients.
    absm1_limits : tuple[int, int | None], optional
        Inclusive lower and exclusive upper limits for the absolute value of
        the m1 index. Defaults to (0, None), ie, all m1 values.
    generate_cache_on_init : bool, optional
        Whether to generate the Gaunt coefficient cache during initialization,
        by default False.

    Attributes
    ----------
    l1max : int
        Maximum spherical harmonic degree for the first set of coefficients.
    l2max : int
        Maximum spherical harmonic degree for the second set of coefficients.
    l3max : int
        Maximum spherical harmonic degree for the third set of coefficients.
    absm1_limits : tuple[int, int | None]
        Inclusive lower and exclusive upper limits for the absolute value of
        the m1 index.
    generate_cache_on_init : bool
        Whether to generate the Gaunt coefficient cache during initialization.
    m1_values : list[int]
        List of m1 values that integrations are performed for.
    m1_index_map : dict[int, int]
        Mapping of m1 values to their indices in a completed set of m1 values.

    Methods
    -------
    generate_gaunt_cache(cache_type=CachingGauntContractor)
        Generates the Gaunt coefficient cache.
    clear_gaunt_cache()
        Clears the Gaunt coefficient cache.
    sparse_gaunt_einsum(sparse_alm1, sparse_alm2, sparse_alm3, sum_m1=False, gaunts=None)
        Performs sparse tensor contraction using computed Gaunt coefficients.
    grid_integrate(alm1, alm2, alm3, sum_m1=False)
        Performs integration using a grid-based approach.
    gaunt_integrate(alm1, alm2, alm3, sum_m1=False)
        Performs integration using a direct sum over precomputed Gaunt coefficients.
    linear_gaunt_integrator_12(alm1, alm2, contract3=None, sum_m1=False)
        Computes a linear integrator for fixed first and second sets of coefficients.
    generate_integrator_cache_12(alm1, alm2, contract3=None, release_gaunt_cache=True,
                                 batch_parallel_mode="channel")
        Generates and caches the linear integrator for fixed first and second sets of
        coefficients.
    batch_gaunt_integrate_cached_12(alm3, integrator_cache=None, sum_m1=False)
        Uses a integrator cache for fixed first and second sets of coefficients to
        compute integrals over a batch of the third set of coefficients.
    linear_gaunt_integrator_23(alm2, alm3, sum_m1=False)
        Computes a linear integrator for fixed second and third sets of coefficients.
    generate_integrator_cache_23(alm2, alm3)
        Generates and caches the linear integrator for fixed second and third sets of
        coefficients.
    batch_gaunt_integrate_cached_12(alm1, integrator_cache=None, sum_m1=False)
        Uses a integrator cache for fixed second and third sets of coefficients to
        compute integrals over a bacth of the first set of coefficients.
    """

    l1max: int
    l2max: int
    l3max: int

    absm1_limits: tuple[int, int | None] = (0, None)
    generate_cache_on_init: bool = False

    linear_integrator_cache_12: npt.NDArray[np.complex128] | None = field(
        init=False, default=None
    )
    linear_integrator_cache_23: npt.NDArray[np.complex128] | None = field(
        init=False, default=None
    )
    gaunt_cache: CachingGauntContractor | CachingGauntContractorOpt12 | None = field(
        init=False, default=None
    )
    m1_values: list[int] = field(init=False)
    m1_index_map: dict[int, int] = field(init=False)
    m1_global_index: list[int] = field(init=False)

    _cache_types: ClassVar[
        dict[str, type[CachingGauntContractor] | type[CachingGauntContractorOpt12]]
    ] = {
        "opt12": CachingGauntContractorOpt12,
        "generic": CachingGauntContractor,
    }

    _process_contract: ClassVar[dict[str, Callable]] = {
        "gaunt-opt": contract_transpose,
        "channel-opt": contract_standard,
        "channel": contract_standard,
        "gaunt": contract_standard,
    }

    def __attrs_post_init__(self):
        if self.absm1_limits[1] is None:
            self.absm1_limits = (self.absm1_limits[0], self.l1max + 1)
        if self.generate_cache_on_init:
            self.generate_gaunt_cache()
        self.m1_values, self.m1_index_map = m1_indexing(
            self.l1max, self.absm1_limits[0], self.absm1_limits[1]
        )
        self.m1_global_index = list(self.m1_index_map.keys())

    def generate_gaunt_cache(
        self, cache_type_tag: Literal["opt12", "generic"] = "generic"
    ):
        """Generates the Gaunt coefficient cache."""
        self.gaunt_cache = self._cache_types[cache_type_tag](
            self.l1max,
            self.l2max,
            self.l3max,
            absm1_lower=self.absm1_limits[0],
            absm1_upper=self.absm1_limits[1],
        )

    def clear_gaunt_cache(self):
        """Clears the Gaunt coefficient cache."""
        self.gaunt_cache = None

    def sparse_gaunt_einsum(
        self,
        sparse_alm1: sparse.SparseArray,
        sparse_alm2: sparse.SparseArray,
        sparse_alm3: sparse.SparseArray,
        sum_m1: bool = False,
        gaunts: sparse.COO | None = None,
    ) -> sparse.COO:
        r"""
        Perform the triple integral using a sparse tensor contraction.

        Compute the triple integral by generating or using a sparse
        representation of the Gaunt coefficients and performing the sum
        with a sparse einsum. This is mostly a cross-checking function
        and is not performant.

        Parameters
        ----------
        sparse_alm1 : sparse.SparseArray
            Sparse represenation of the first set of coefficients with shape (l1max+1, Nm1).
        sparse_alm2 : sparse.SparseArray
            Sparse represenation of the second set of coefficients with shape (l2max+1, 2*l2max+1).
        sparse_alm3 : sparse.SparseArray
            Sparse represenation of the third set of coefficients (l3max+1, 2*l3max+1).
        sum_m1 : bool, optional
            Whether to sum over the m1's before output, by default False.
        gaunts : sparse.COO | None, optional
            Sparse representation of the Gaunt coefficients if already.
            computed, by default None.

        Returns
        -------
        npt.NDArray[np.complex128] | np.complex128
            If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
            sum over all m-modes.
        """
        if gaunts is None:
            gaunts = gaunts_coo(self.l1max, self.l2max, self.l3max)
        if sum_m1:
            return sparse.einsum(
                "ijklmn, ij, kl, mn -> ", gaunts, sparse_alm1, sparse_alm2, sparse_alm3
            )
        else:
            return sparse.einsum(
                "ijklmn, ij, kl, mn -> j",
                gaunts,
                sparse_alm1,
                sparse_alm2,
                sparse_alm3,
            ).todense()

    def grid_integrate(
        self,
        alm1: npt.NDArray[np.complex128],
        alm2: npt.NDArray[np.complex128],
        alm3: npt.NDArray[np.complex128],
        sum_m1: bool = False,
    ) -> npt.NDArray[np.complex128]:
        r"""Perform the triple integral using a grid-based approach.

        Here the product of the second and third fields are evaluated on a
        consistent resolution grid, decomposed to conjugate spherical
        harmonics and multipled by the first set of coefficients. This more
        standard m-mode approach is performant for single integrals
        but, as implemented here, can't cache intermediate results and
        therefore not suitable for batching.


        Parameters
        ---------
        alm1 : npt.NDArray[np.complex128]
            The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
        alm2 : npt.NDArray[np.complex128]
            The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
        alm3 : npt.NDArray[np.complex128]
            The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).
        sum_m1 : bool, optional
            Whether to sum over the m1's before output, by default False.

        Returns
        -------
        npt.NDArray[np.complex128] | np.complex128
            If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
            sum over all m-modes.
        """
        grid_lmax = int(np.ceil(self.l1max + self.l2max + self.l3max))
        alm1, alm2, alm3 = broadcast_bandlimits(alm1, alm2, alm3, lmax=grid_lmax)
        static_grid = grid_template(grid_lmax)
        static_grid.data[:] = 1.0
        static_grid *= array_synthesis(alm2) * array_synthesis(alm3)
        static_alm = analysis(static_grid, conjugate=True, return_sparse=False)
        out_mmodes = compute_mmodes(static_alm, alm1)
        return out_mmodes[np.asarray(self.m1_values) + grid_lmax]

    def gaunt_integrate(
        self,
        alm1: npt.NDArray[np.complex128],
        alm2: npt.NDArray[np.complex128],
        alm3: npt.NDArray[np.complex128],
        sum_m1: bool = False,
    ) -> npt.NDArray[np.complex128] | float:
        r"""Perform the triple integral by doing and inplace
        sum-product over gaunt coefficients.

        $$
        \mathcal{V} = \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_1 m_2 m_3}}
                    \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~
                    a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3}.
        $$

        Here the sum is performed inplace with no caching.

        Parameters
        ----------
        alm1 : npt.NDArray[np.complex128]
            The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
        alm2 : npt.NDArray[np.complex128]
            The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
        alm3 : npt.NDArray[np.complex128]
            The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).
        sum_m1 : bool, optional
            Whether to sum over the m1's before output, by default False.

        Returns
        -------
        npt.NDArray[np.complex128] | np.complex128
            If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
            sum over all m-modes.
        """
        return gaunt_dot123(
            alm1=alm1,
            alm2=alm2,
            alm3=alm3,
            sum_m1=sum_m1,
            absm1_lower=self.absm1_limits[0],
            absm1_upper=self.absm1_limits[1],
        )

    def linear_gaunt_integrator_12(
        self,
        alm1: npt.NDArray[np.complex128],
        alm2: npt.NDArray[np.complex128],
        contract3: npt.NDArray[np.complex128] | None = None,
        sum_m1: bool = False,
    ) -> npt.NDArray[np.complex128]:
        r"""Generate a linear operator that, with fixed first and second set of coefficients
        performs the integral when sum-producted with the third set of coefficients.

        $$
        \matr{L}_{m_1}^{\ell_3 m_3} =
                    \sum_{\substack{\ell_1 \ell_2 \\ m_1 m_2}}
                    \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~
                    a_{\ell_1 m_1}a_{\ell_2 m_2}.
        $$

        If requested, additionally contract with a linear operator along the
        m3 axis (e.g. a rotation with a Wigner-d matrix),

        $$
        \matr{L}_{m_1}^{\ell_3 m^\prime_3} = \sum_{m_3} \matr{W}^{\ell_3 m^\prime_3}_{m_3}
                                             \matr{L}_{m_1}^{\ell_3 m_3}
        $$

        Parameters
        ----------
        alm1 : npt.NDArray[np.complex128]
            The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
        alm2 : npt.NDArray[np.complex128]
            The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
        contract3 : npt.NDArray[np.complex128] | None, optional
            An optional linear operator along the m3 axis, shape
            (l3max+1, 2*l3max+1, Nm3prime<=2*l3max+1), by default None
        sum_m1 : bool, optional
            Whether to sum over the m1's before output, by default False.

        Returns
        -------
        npt.NDArray[np.complex128]
            By default (Nm1, Nl3, nm3) linear operator. If sum_m1=True, an (Nl3, Nm3) array
            where the m1 axis has been summed over. If contract3 is present, this will instead
            be an (Nm1, Nl3, Nm3prime) or (Nl3, Nm3prime) array resulting from the subsequent
            contraction on the m3 axis.
        """
        tensor = gaunt_dot12(
            alm1=alm1,
            alm2=alm2,
            l3max=self.l3max,
            sum_m1=sum_m1,
            absm1_lower=self.absm1_limits[0],
            absm1_upper=self.absm1_limits[1],
        )
        if contract3 is not None:
            return integrator12_contract3(tensor, contract3)
        else:
            return tensor

    def generate_integrator_cache_12(
        self,
        alm1: npt.NDArray[np.complex128],
        alm2: npt.NDArray[np.complex128],
        contract3: npt.NDArray[np.complex128] | None = None,
        release_gaunt_cache: bool = True,
        batch_parallel_mode: Literal[
            "channel-opt", "channel", "gaunt", "gaunt-opt"
        ] = "channel-opt",
        alm1_transposed: npt.NDArray[np.complex128] | None = None,
        alm2_transposed: npt.NDArray[np.complex128] | None = None,
    ):
        """_summary_

        Parameters
        ----------
        alm1 : npt.NDArray[np.complex128]
            _description_
        alm2 : npt.NDArray[np.complex128]
            _description_
        contract3 : npt.NDArray[np.complex128] | None, optional
            _description_, by default None
        release_gaunt_cache : bool, optional
            _description_, by default True
        batch_parallel_mode : Literal["channel-opt", "channel", "gaunt", "gaunt-opt"], optional
            _description_, by default "channel-opt"
        alm1_transposed : npt.NDArray[np.complex128] | None, optional
            Pre-transposed alm1 in (m, l, batch) layout, by default None
        alm2_transposed : npt.NDArray[np.complex128] | None, optional
            Pre-transposed alm2 in (m, l, batch) layout, by default None
        """
        cache_type_tag: Literal["opt12", "generic"] = (
            "opt12" if batch_parallel_mode in ("channel-opt", "gaunt-opt") else "generic"
        )
        if self.gaunt_cache is None:
            self.generate_gaunt_cache(cache_type_tag)
        if not isinstance(self.gaunt_cache, self._cache_types[cache_type_tag]):
            raise TypeError(
                f"gaunt_cache must be a {self._cache_types[cache_type_tag]} object."
            )
        alm1 = extend_dimensions_if_one_batch(alm1, 3)
        alm2 = extend_dimensions_if_one_batch(alm2, 3)
        batch_size = alm1.shape[0]
        n_m1 = len(self.m1_values)
        match batch_parallel_mode:
            case "gaunt-opt":
                result = empty_complex_array((n_m1, self.l3max + 1, 2 * self.l3max + 1, batch_size))
                alm1_T = transpose_alm(alm1, (2, 1, 0), alm1_transposed)
                alm2_T = transpose_alm(alm2, (2, 1, 0), alm2_transposed)
                self.gaunt_cache.gaunt_opt_batch_dot12(alm1_T, alm2_T, result)
            case "channel-opt" | "channel":
                result = empty_complex_array((batch_size, n_m1, self.l3max + 1, 2 * self.l3max + 1))
                alm1_T = transpose_alm(alm1, (0, 2, 1))
                alm2_T = transpose_alm(alm2, (0, 2, 1))
                self.gaunt_cache.batch_parallel_batch_dot12(alm1_T, alm2_T, result)
            case _:
                result = empty_complex_array((batch_size, n_m1, self.l3max + 1, 2 * self.l3max + 1))
                self.gaunt_cache.gaunt_parallel_batch_dot12(alm1, alm2, result)
        result = self._process_contract[batch_parallel_mode](result, contract3)
        if release_gaunt_cache:
            self.clear_gaunt_cache()
        self.linear_integrator_cache_12 = remove_dimension_if_one_batch(result)

    def batch_gaunt_integrate_cached_12(
        self,
        alm3: npt.NDArray[np.complex128],
        integrator_cache: npt.NDArray[np.complex128] | None = None,
        sum_m1: bool = False,
    ):
        if integrator_cache is None:
            if self.linear_integrator_cache_12 is None:
                raise ValueError("Integrator cache must be provided or pre-computed")
            else:
                tensor = self.linear_integrator_cache_12
        else:
            tensor = integrator_cache
        result = integrator12_dot3(tensor, alm3)
        if sum_m1:
            return np.sum(result, axis=0)
        else:
            return result

    def linear_gaunt_integrator_23(
        self,
        alm2: npt.NDArray[np.complex128],
        alm3: npt.NDArray[np.complex128],
        sum_m1: bool = False,
    ) -> npt.NDArray[np.complex128]:
        return gaunt_dot23(
            alm2=alm2,
            alm3=alm3,
            l1max=self.l1max,
            sum_m1=sum_m1,
            absm1_lower=self.absm1_limits[0],
            absm1_upper=self.absm1_limits[1],
        )

    def generate_integrator_cache_23(
        self,
        alm2: npt.NDArray[np.complex128],
        alm3: npt.NDArray[np.complex128],
        release_gaunt_cache: bool = True,
        batch_parallel_mode: Literal["channel", "gaunt"] = "channel",
    ):
        if self.gaunt_cache is None:
            self.generate_gaunt_cache()
        if not isinstance(self.gaunt_cache, CachingGauntContractor):
            raise TypeError("gaunt_cache must be a CachingGauntContractor object.")
        alm2 = extend_dimensions_if_one_batch(alm2, 3)
        alm3 = extend_dimensions_if_one_batch(alm3, 3)
        result = empty_complex_array((alm2.shape[0], len(self.m1_values), self.l1max + 1))
        if batch_parallel_mode == "channel":
            self.gaunt_cache.batch_parallel_batch_dot23(alm2, alm3, result)
        else:
            self.gaunt_cache.gaunt_parallel_batch_dot23(alm2, alm3, result)
        if release_gaunt_cache:
            self.clear_gaunt_cache()
        self.linear_integrator_cache_23 = remove_dimension_if_one_batch(result)

    def batch_gaunt_integrate_cached_23(
        self,
        alm1: npt.NDArray[np.complex128],
        integrator_cache: npt.NDArray[np.complex128] | None = None,
        sum_m1: bool = False,
    ):
        if integrator_cache is None:
            if self.linear_integrator_cache_23 is None:
                raise ValueError("Integrator cache must be provided or pre-computed")
            else:
                tensor = self.linear_integrator_cache_23
        else:
            tensor = integrator_cache
        result = integrator23_dot1(tensor, alm1)
        if sum_m1:
            return np.sum(result, axis=0)
        else:
            return result

clear_gaunt_cache()

Clears the Gaunt coefficient cache.

Source code in src/serval/core.py
def clear_gaunt_cache(self):
    """Clears the Gaunt coefficient cache."""
    self.gaunt_cache = None

gaunt_integrate(alm1, alm2, alm3, sum_m1=False)

Perform the triple integral by doing and inplace sum-product over gaunt coefficients.

\[ \mathcal{V} = \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_1 m_2 m_3}} \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~ a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3}. \]

Here the sum is performed inplace with no caching.

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).

required
alm2 NDArray[complex128]

The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).

required
alm3 NDArray[complex128]

The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).

required
sum_m1 bool

Whether to sum over the m1's before output, by default False.

False

Returns:

Type Description
NDArray[complex128] | complex128

If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar sum over all m-modes.

Source code in src/serval/core.py
def gaunt_integrate(
    self,
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    alm3: npt.NDArray[np.complex128],
    sum_m1: bool = False,
) -> npt.NDArray[np.complex128] | float:
    r"""Perform the triple integral by doing and inplace
    sum-product over gaunt coefficients.

    $$
    \mathcal{V} = \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_1 m_2 m_3}}
                \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~
                a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3}.
    $$

    Here the sum is performed inplace with no caching.

    Parameters
    ----------
    alm1 : npt.NDArray[np.complex128]
        The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
    alm2 : npt.NDArray[np.complex128]
        The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
    alm3 : npt.NDArray[np.complex128]
        The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).
    sum_m1 : bool, optional
        Whether to sum over the m1's before output, by default False.

    Returns
    -------
    npt.NDArray[np.complex128] | np.complex128
        If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
        sum over all m-modes.
    """
    return gaunt_dot123(
        alm1=alm1,
        alm2=alm2,
        alm3=alm3,
        sum_m1=sum_m1,
        absm1_lower=self.absm1_limits[0],
        absm1_upper=self.absm1_limits[1],
    )

generate_gaunt_cache(cache_type_tag='generic')

Generates the Gaunt coefficient cache.

Source code in src/serval/core.py
def generate_gaunt_cache(
    self, cache_type_tag: Literal["opt12", "generic"] = "generic"
):
    """Generates the Gaunt coefficient cache."""
    self.gaunt_cache = self._cache_types[cache_type_tag](
        self.l1max,
        self.l2max,
        self.l3max,
        absm1_lower=self.absm1_limits[0],
        absm1_upper=self.absm1_limits[1],
    )

generate_integrator_cache_12(alm1, alm2, contract3=None, release_gaunt_cache=True, batch_parallel_mode='channel-opt', alm1_transposed=None, alm2_transposed=None)

summary

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

description

required
alm2 NDArray[complex128]

description

required
contract3 NDArray[complex128] | None

description, by default None

None
release_gaunt_cache bool

description, by default True

True
batch_parallel_mode Literal['channel-opt', 'channel', 'gaunt', 'gaunt-opt']

description, by default "channel-opt"

'channel-opt'
alm1_transposed NDArray[complex128] | None

Pre-transposed alm1 in (m, l, batch) layout, by default None

None
alm2_transposed NDArray[complex128] | None

Pre-transposed alm2 in (m, l, batch) layout, by default None

None
Source code in src/serval/core.py
def generate_integrator_cache_12(
    self,
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    contract3: npt.NDArray[np.complex128] | None = None,
    release_gaunt_cache: bool = True,
    batch_parallel_mode: Literal[
        "channel-opt", "channel", "gaunt", "gaunt-opt"
    ] = "channel-opt",
    alm1_transposed: npt.NDArray[np.complex128] | None = None,
    alm2_transposed: npt.NDArray[np.complex128] | None = None,
):
    """_summary_

    Parameters
    ----------
    alm1 : npt.NDArray[np.complex128]
        _description_
    alm2 : npt.NDArray[np.complex128]
        _description_
    contract3 : npt.NDArray[np.complex128] | None, optional
        _description_, by default None
    release_gaunt_cache : bool, optional
        _description_, by default True
    batch_parallel_mode : Literal["channel-opt", "channel", "gaunt", "gaunt-opt"], optional
        _description_, by default "channel-opt"
    alm1_transposed : npt.NDArray[np.complex128] | None, optional
        Pre-transposed alm1 in (m, l, batch) layout, by default None
    alm2_transposed : npt.NDArray[np.complex128] | None, optional
        Pre-transposed alm2 in (m, l, batch) layout, by default None
    """
    cache_type_tag: Literal["opt12", "generic"] = (
        "opt12" if batch_parallel_mode in ("channel-opt", "gaunt-opt") else "generic"
    )
    if self.gaunt_cache is None:
        self.generate_gaunt_cache(cache_type_tag)
    if not isinstance(self.gaunt_cache, self._cache_types[cache_type_tag]):
        raise TypeError(
            f"gaunt_cache must be a {self._cache_types[cache_type_tag]} object."
        )
    alm1 = extend_dimensions_if_one_batch(alm1, 3)
    alm2 = extend_dimensions_if_one_batch(alm2, 3)
    batch_size = alm1.shape[0]
    n_m1 = len(self.m1_values)
    match batch_parallel_mode:
        case "gaunt-opt":
            result = empty_complex_array((n_m1, self.l3max + 1, 2 * self.l3max + 1, batch_size))
            alm1_T = transpose_alm(alm1, (2, 1, 0), alm1_transposed)
            alm2_T = transpose_alm(alm2, (2, 1, 0), alm2_transposed)
            self.gaunt_cache.gaunt_opt_batch_dot12(alm1_T, alm2_T, result)
        case "channel-opt" | "channel":
            result = empty_complex_array((batch_size, n_m1, self.l3max + 1, 2 * self.l3max + 1))
            alm1_T = transpose_alm(alm1, (0, 2, 1))
            alm2_T = transpose_alm(alm2, (0, 2, 1))
            self.gaunt_cache.batch_parallel_batch_dot12(alm1_T, alm2_T, result)
        case _:
            result = empty_complex_array((batch_size, n_m1, self.l3max + 1, 2 * self.l3max + 1))
            self.gaunt_cache.gaunt_parallel_batch_dot12(alm1, alm2, result)
    result = self._process_contract[batch_parallel_mode](result, contract3)
    if release_gaunt_cache:
        self.clear_gaunt_cache()
    self.linear_integrator_cache_12 = remove_dimension_if_one_batch(result)

grid_integrate(alm1, alm2, alm3, sum_m1=False)

Perform the triple integral using a grid-based approach.

Here the product of the second and third fields are evaluated on a consistent resolution grid, decomposed to conjugate spherical harmonics and multipled by the first set of coefficients. This more standard m-mode approach is performant for single integrals but, as implemented here, can't cache intermediate results and therefore not suitable for batching.

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).

required
alm2 NDArray[complex128]

The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).

required
alm3 NDArray[complex128]

The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).

required
sum_m1 bool

Whether to sum over the m1's before output, by default False.

False

Returns:

Type Description
NDArray[complex128] | complex128

If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar sum over all m-modes.

Source code in src/serval/core.py
def grid_integrate(
    self,
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    alm3: npt.NDArray[np.complex128],
    sum_m1: bool = False,
) -> npt.NDArray[np.complex128]:
    r"""Perform the triple integral using a grid-based approach.

    Here the product of the second and third fields are evaluated on a
    consistent resolution grid, decomposed to conjugate spherical
    harmonics and multipled by the first set of coefficients. This more
    standard m-mode approach is performant for single integrals
    but, as implemented here, can't cache intermediate results and
    therefore not suitable for batching.


    Parameters
    ---------
    alm1 : npt.NDArray[np.complex128]
        The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
    alm2 : npt.NDArray[np.complex128]
        The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
    alm3 : npt.NDArray[np.complex128]
        The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).
    sum_m1 : bool, optional
        Whether to sum over the m1's before output, by default False.

    Returns
    -------
    npt.NDArray[np.complex128] | np.complex128
        If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
        sum over all m-modes.
    """
    grid_lmax = int(np.ceil(self.l1max + self.l2max + self.l3max))
    alm1, alm2, alm3 = broadcast_bandlimits(alm1, alm2, alm3, lmax=grid_lmax)
    static_grid = grid_template(grid_lmax)
    static_grid.data[:] = 1.0
    static_grid *= array_synthesis(alm2) * array_synthesis(alm3)
    static_alm = analysis(static_grid, conjugate=True, return_sparse=False)
    out_mmodes = compute_mmodes(static_alm, alm1)
    return out_mmodes[np.asarray(self.m1_values) + grid_lmax]

linear_gaunt_integrator_12(alm1, alm2, contract3=None, sum_m1=False)

Generate a linear operator that, with fixed first and second set of coefficients performs the integral when sum-producted with the third set of coefficients.

\[ \matr{L}_{m_1}^{\ell_3 m_3} = \sum_{\substack{\ell_1 \ell_2 \\ m_1 m_2}} \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~ a_{\ell_1 m_1}a_{\ell_2 m_2}. \]

If requested, additionally contract with a linear operator along the m3 axis (e.g. a rotation with a Wigner-d matrix),

\[ \matr{L}_{m_1}^{\ell_3 m^\prime_3} = \sum_{m_3} \matr{W}^{\ell_3 m^\prime_3}_{m_3} \matr{L}_{m_1}^{\ell_3 m_3} \]

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).

required
alm2 NDArray[complex128]

The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).

required
contract3 NDArray[complex128] | None

An optional linear operator along the m3 axis, shape (l3max+1, 2l3max+1, Nm3prime<=2l3max+1), by default None

None
sum_m1 bool

Whether to sum over the m1's before output, by default False.

False

Returns:

Type Description
NDArray[complex128]

By default (Nm1, Nl3, nm3) linear operator. If sum_m1=True, an (Nl3, Nm3) array where the m1 axis has been summed over. If contract3 is present, this will instead be an (Nm1, Nl3, Nm3prime) or (Nl3, Nm3prime) array resulting from the subsequent contraction on the m3 axis.

Source code in src/serval/core.py
def linear_gaunt_integrator_12(
    self,
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    contract3: npt.NDArray[np.complex128] | None = None,
    sum_m1: bool = False,
) -> npt.NDArray[np.complex128]:
    r"""Generate a linear operator that, with fixed first and second set of coefficients
    performs the integral when sum-producted with the third set of coefficients.

    $$
    \matr{L}_{m_1}^{\ell_3 m_3} =
                \sum_{\substack{\ell_1 \ell_2 \\ m_1 m_2}}
                \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~
                a_{\ell_1 m_1}a_{\ell_2 m_2}.
    $$

    If requested, additionally contract with a linear operator along the
    m3 axis (e.g. a rotation with a Wigner-d matrix),

    $$
    \matr{L}_{m_1}^{\ell_3 m^\prime_3} = \sum_{m_3} \matr{W}^{\ell_3 m^\prime_3}_{m_3}
                                         \matr{L}_{m_1}^{\ell_3 m_3}
    $$

    Parameters
    ----------
    alm1 : npt.NDArray[np.complex128]
        The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
    alm2 : npt.NDArray[np.complex128]
        The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
    contract3 : npt.NDArray[np.complex128] | None, optional
        An optional linear operator along the m3 axis, shape
        (l3max+1, 2*l3max+1, Nm3prime<=2*l3max+1), by default None
    sum_m1 : bool, optional
        Whether to sum over the m1's before output, by default False.

    Returns
    -------
    npt.NDArray[np.complex128]
        By default (Nm1, Nl3, nm3) linear operator. If sum_m1=True, an (Nl3, Nm3) array
        where the m1 axis has been summed over. If contract3 is present, this will instead
        be an (Nm1, Nl3, Nm3prime) or (Nl3, Nm3prime) array resulting from the subsequent
        contraction on the m3 axis.
    """
    tensor = gaunt_dot12(
        alm1=alm1,
        alm2=alm2,
        l3max=self.l3max,
        sum_m1=sum_m1,
        absm1_lower=self.absm1_limits[0],
        absm1_upper=self.absm1_limits[1],
    )
    if contract3 is not None:
        return integrator12_contract3(tensor, contract3)
    else:
        return tensor

sparse_gaunt_einsum(sparse_alm1, sparse_alm2, sparse_alm3, sum_m1=False, gaunts=None)

Perform the triple integral using a sparse tensor contraction.

Compute the triple integral by generating or using a sparse representation of the Gaunt coefficients and performing the sum with a sparse einsum. This is mostly a cross-checking function and is not performant.

Parameters:

Name Type Description Default
sparse_alm1 SparseArray

Sparse represenation of the first set of coefficients with shape (l1max+1, Nm1).

required
sparse_alm2 SparseArray

Sparse represenation of the second set of coefficients with shape (l2max+1, 2*l2max+1).

required
sparse_alm3 SparseArray

Sparse represenation of the third set of coefficients (l3max+1, 2*l3max+1).

required
sum_m1 bool

Whether to sum over the m1's before output, by default False.

False
gaunts COO | None

Sparse representation of the Gaunt coefficients if already. computed, by default None.

None

Returns:

Type Description
NDArray[complex128] | complex128

If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar sum over all m-modes.

Source code in src/serval/core.py
def sparse_gaunt_einsum(
    self,
    sparse_alm1: sparse.SparseArray,
    sparse_alm2: sparse.SparseArray,
    sparse_alm3: sparse.SparseArray,
    sum_m1: bool = False,
    gaunts: sparse.COO | None = None,
) -> sparse.COO:
    r"""
    Perform the triple integral using a sparse tensor contraction.

    Compute the triple integral by generating or using a sparse
    representation of the Gaunt coefficients and performing the sum
    with a sparse einsum. This is mostly a cross-checking function
    and is not performant.

    Parameters
    ----------
    sparse_alm1 : sparse.SparseArray
        Sparse represenation of the first set of coefficients with shape (l1max+1, Nm1).
    sparse_alm2 : sparse.SparseArray
        Sparse represenation of the second set of coefficients with shape (l2max+1, 2*l2max+1).
    sparse_alm3 : sparse.SparseArray
        Sparse represenation of the third set of coefficients (l3max+1, 2*l3max+1).
    sum_m1 : bool, optional
        Whether to sum over the m1's before output, by default False.
    gaunts : sparse.COO | None, optional
        Sparse representation of the Gaunt coefficients if already.
        computed, by default None.

    Returns
    -------
    npt.NDArray[np.complex128] | np.complex128
        If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
        sum over all m-modes.
    """
    if gaunts is None:
        gaunts = gaunts_coo(self.l1max, self.l2max, self.l3max)
    if sum_m1:
        return sparse.einsum(
            "ijklmn, ij, kl, mn -> ", gaunts, sparse_alm1, sparse_alm2, sparse_alm3
        )
    else:
        return sparse.einsum(
            "ijklmn, ij, kl, mn -> j",
            gaunts,
            sparse_alm1,
            sparse_alm2,
            sparse_alm3,
        ).todense()

VisProjector

Parameters:

Name Type Description Default
latitude float
required
longitude float
required
baseline_enu tuple[float, float, float]
required
frequencies_MHz ndarray
required
power_beam_lmax int | None
None
baseline_lmax int | None
None
sky_lmax int | None
None
sky_absm_limits list[int | None]
[0, None]
generate_gaunt_cache_on_init bool
False
generate_baseline_cache_on_init bool
False
batch_parallel_mode Literal['channel-opt', 'channel', 'gaunt', 'gaunt-opt']
'channel'

Attributes:

Name Type Description
baseline_mmax int | None
baseline_cache ndarray | None
integrator_cache ndarray | None
triple_sh_integrator TripleSHIntegrator
Source code in src/serval/core.py
@define(eq=False)
class VisProjector:
    latitude: float
    longitude: float
    baseline_enu: tuple[float, float, float]
    frequencies_MHz: np.ndarray

    power_beam_lmax: int | None = None
    baseline_lmax: int | None = None
    baseline_mmax: int | None = field(init=False, default=None)
    sky_lmax: int | None = None

    sky_absm_limits: list[int | None] = field(factory=lambda: [0, None])

    generate_gaunt_cache_on_init: bool = False
    generate_baseline_cache_on_init: bool = False
    batch_parallel_mode: Literal["channel-opt", "channel", "gaunt", "gaunt-opt"] = "channel"

    baseline_cache: np.ndarray | None = field(init=False, default=None)
    integrator_cache: np.ndarray | None = field(init=False, default=None)

    triple_sh_integrator: TripleSHIntegrator = field(init=False)

    _last_store_location: Path | None = field(init=False, default=None)
    _last_group_path: str | None = field(init=False, default=None)

    _to_attrs: ClassVar[list[str]] = []
    _to_zarr_store: ClassVar[list[str]] = []
    _opt_tag: ClassVar[str] = ""

    def update_integrator(self):
        self.triple_sh_integrator = TripleSHIntegrator(
            l1max=self.sky_lmax,
            l2max=self.baseline_lmax,
            l3max=self.power_beam_lmax,
            absm1_limits=self.sky_absm_limits,
            generate_cache_on_init=False,
        )

    def generate_gaunt_cache(self):
        if self.baseline_cache is None:
            self.generate_baseline_cache()
        self.update_integrator()
        if self.batch_parallel_mode in ("channel-opt", "gaunt-opt"):
            self.triple_sh_integrator.generate_gaunt_cache(self._opt_tag)
        else:
            self.triple_sh_integrator.generate_gaunt_cache("generic")

    @property
    def sky_m_values(self):
        return self.triple_sh_integrator.m1_values

    @property
    def sky_m_index_map(self):
        return self.triple_sh_integrator.m1_index_map

    @property
    def sky_m_global_index(self):
        return self.triple_sh_integrator.m1_global_index

    @property
    def sky_m_index_slices(self):
        m_inds = self.sky_m_global_index
        nm_inds = m_inds[: len(m_inds) // 2]
        if len(nm_inds) != 0:  # If ms == [0,]
            pm_inds = m_inds[len(m_inds) // 2 :]
            return (
                [slice(0, self.num_sky_ms // 2), slice(self.num_sky_ms // 2, None)],
                [
                    slice(nm_inds[0], nm_inds[-1] + 1),
                    slice(pm_inds[0], pm_inds[-1] + 1),
                ],
            )
        else:
            return [
                slice(0, 1),
            ], [
                slice(m_inds[0], m_inds[0] + 1),
            ]

    @property
    def num_sky_ms(self):
        return len(self.sky_m_values)

    def _setup_zarr_store(
        self,
        store_location: str | Path,
        integrator_cache_shape: tuple[int, ...],
        integrator_cache_chunks: tuple[int, ...],
        group_path: str = r"/",
        store_metadata_attrs: dict | None = None,
    ):
        to_attrs_dict = {slot: getattr(self, slot) for slot in self._to_attrs}
        to_zarr_store_dict = {slot: getattr(self, slot) for slot in self._to_zarr_store}

        destination_dir = Path(store_location)

        self._last_store_location = destination_dir
        self._last_group_path = group_path

        store = zarr.storage.LocalStore(destination_dir)
        grp = zarr.create_group(store=store, path=group_path, overwrite=True)

        assert self.sky_lmax is not None

        if store_metadata_attrs is not None:
            if group_path != r"/":  # coulf check this more thoroughly
                root = zarr.group(store=store, path="/")
                root.attrs.update(store_metadata_attrs)
            else:
                grp.attrs.update(store_metadata_attrs)

        for k, v in to_attrs_dict.items():
            grp.attrs[k] = v

        for k, v in to_zarr_store_dict.items():
            arr = grp.create_array(
                name=k,
                shape=v.shape,
                dtype=v.dtype,
                overwrite=True,
            )
            arr[:] = v

        ms_completed = grp.create_array(
            name="ms_completed",
            chunks=1,
            shape=2 * self.sky_lmax + 1,
            dtype=bool,
            overwrite=True,
        )
        ms_completed[:] = False

        grp.create_array(
            name="integrator_cache",
            shape=integrator_cache_shape,
            chunks=integrator_cache_chunks,
            dtype=np.complex128,
            overwrite=True,
            compressors=zarr.codecs.BloscCodec(**ZARR_COMPRESSORS),
        )

    def write_integrator_cache_to_store(
        self, store_location: str | Path | None = None, group_path: str | None = None
    ):
        if self.integrator_cache is None:
            raise ValueError("Cannot store integrator_cache because it is None.")
        if store_location is None and self._last_store_location is not None:
            destination_dir = self._last_store_location
        elif store_location is not None:
            destination_dir = Path(store_location)
        else:
            raise ValueError("Store location must be specified.")
        if group_path is None and self._last_group_path is not None:
            group_path = self._last_group_path
        elif group_path is None:
            group_path = r"/"  # Default to root group
        store = zarr.storage.LocalStore(destination_dir)
        grp = zarr.group(store=store, path=group_path)
        integrator_cache = grp["integrator_cache"]
        ms_completed = grp["ms_completed"]

        local_m_slices, global_m_slices = self.sky_m_index_slices
        cache = extend_dimensions_if_one_batch(
            self.integrator_cache, self.integrator_cache.ndim + (self.frequencies_MHz.size == 1)
        )
        for lslc, gslc in zip(local_m_slices, global_m_slices):  # noqa: B905
            integrator_cache[:, gslc] = cache[:, lslc]
            ms_completed[gslc] = True

    @classmethod
    def from_zarr_store(
        cls,
        store_location: str | Path,
        group_path: str = r"/",
        freq_slice: slice | None = None,
        sky_absm_limits=(0, None),
        load_cache=True,
    ):
        if freq_slice is None:
            freq_slice = slice(None, None, None)
        destination_dir = Path(store_location)
        store = zarr.storage.LocalStore(destination_dir)
        grp = zarr.group(store=store, path=group_path)
        attr_defaults = {
            "aberrate_baseline": False,
        }
        init_dict = {
            k: grp.attrs[k] if k in grp.attrs else attr_defaults[k]
            for k in cls._to_attrs
        }
        init_dict.update({k: grp[k][freq_slice] for k in cls._to_zarr_store})
        proj = cls(sky_absm_limits=sky_absm_limits, **init_dict)
        if load_cache:
            local_m_slices, global_m_slices = proj.sky_m_index_slices
            cache_shape = (
                proj.frequencies_MHz.size,
                len(proj.sky_m_values),
            ) + grp[
                "integrator_cache"
            ].shape[2:]
            proj.integrator_cache = empty_complex_array(cache_shape)
            for lslc, gslc in zip(local_m_slices, global_m_slices):
                # Load from global slice into local slice
                grp["integrator_cache"].get_basic_selection(
                    (freq_slice, gslc, ...),
                    out=NDBuffer(proj.integrator_cache[:, lslc, ...]),
                )
                if not grp["ms_completed"][gslc].all():
                    warnings.warn(
                        "Zarr Store indicates not all requested sky ms computed.",
                        stacklevel=2,
                    )  # points to caller
        return proj

extend_dimensions_if_one_batch(arr, dim)

If a single frequency is calculated then the alms and cache matrices will be missing the first dimension that corresponds to the number of frequencies. This function takes the arrays and adds a (1, ...) dimension

Parameters:

Name Type Description Default
arr NDArray[complex128]

The input array.

required
dim int

The desired number of dimensions.

required

Returns:

Type Description
NDArray[complex128]

The array with the correct number of dimensions.

Source code in src/serval/core.py
def extend_dimensions_if_one_batch(
    arr: npt.NDArray[np.complex128], dim: int
) -> npt.NDArray[np.complex128]:
    """If a single frequency is calculated then the alms and cache matrices will be missing
    the first dimension that corresponds to the number of frequencies. This function takes the
    arrays and adds a (1, ...) dimension
    Parameters
    ----------
    arr : npt.NDArray[np.complex128]
        The input array.
    dim : int
        The desired number of dimensions.
    Returns
    -------
    npt.NDArray[np.complex128]
        The array with the correct number of dimensions.
    """
    if arr.ndim < dim:
        arr = arr[None, ...]
    return arr

Rotation Utilities (serval.rotate.py)

batch_rotate_alm(alm, lmax, mmax, eulers_or_rotation)

Rotate a batch of SERVAL alm arrays using two calls to ducc0.sht.rotate_alm.

Converts the input to the ducc0 real-field decomposition, applies the rotation in batch, and converts back. The output always has mmax = lmax because a rotation generically populates all m-modes.

Parameters:

Name Type Description Default
alm NDArray[complex128]

SERVAL alm array of shape (..., lmax+1, 2*mmax+1).

required
lmax int

Maximum degree.

required
mmax int

Maximum order of the input.

required
eulers_or_rotation tuple[float, float, float] | Rotation

Passive ZYZ Euler angles (alpha, beta, gamma) in radians, or a scipy.spatial.transform.Rotation object.

required

Returns:

Type Description
NDArray[complex128]

SERVAL alm of shape (..., lmax+1, 2*lmax+1).

Source code in src/serval/rotate.py
def batch_rotate_alm(
    alm: npt.NDArray[np.complex128],
    lmax: int,
    mmax: int,
    eulers_or_rotation: EulersType | Rotation,
) -> npt.NDArray[np.complex128]:
    r"""Rotate a batch of SERVAL alm arrays using two calls to ``ducc0.sht.rotate_alm``.

    Converts the input to the ducc0 real-field decomposition, applies the rotation
    in batch, and converts back.  The output always has ``mmax = lmax`` because a
    rotation generically populates all m-modes.

    Parameters
    ----------
    alm : npt.NDArray[np.complex128]
        SERVAL alm array of shape ``(..., lmax+1, 2*mmax+1)``.
    lmax : int
        Maximum degree.
    mmax : int
        Maximum order of the input.
    eulers_or_rotation : tuple[float, float, float] | Rotation
        Passive ZYZ Euler angles ``(alpha, beta, gamma)`` in radians, or a
        ``scipy.spatial.transform.Rotation`` object.

    Returns
    -------
    npt.NDArray[np.complex128]
        SERVAL alm of shape ``(..., lmax+1, 2*lmax+1)``.
    """
    if isinstance(eulers_or_rotation, Rotation):
        alpha, beta, gamma = eulers_from_rotation(eulers_or_rotation)
    else:
        alpha, beta, gamma = eulers_or_rotation

    batch_shape = alm.shape[:-2]
    n_batch = int(np.prod(batch_shape)) if batch_shape else 1
    alm_2d = alm.reshape(n_batch, lmax + 1, 2 * mmax + 1)

    aR, aI = serval_to_ducc0_alm(alm_2d, lmax, mmax)
    # ducc0 uses an active rotation convention; negate angles to match passive ZYZ
    rR = ducc0.sht.rotate_alm(aR, lmax, -alpha, -beta, -gamma)
    rI = ducc0.sht.rotate_alm(aI, lmax, -alpha, -beta, -gamma) if np.any(aI) else aI

    rotated = ducc0_to_serval_alm(rR, rI, lmax)
    return rotated.reshape(*batch_shape, lmax + 1, 2 * lmax + 1)

boresight_for_fixed_pol_ref(alt, az, ref_alt=np.pi / 2, ref_az=np.pi, ref_boresight=0.0)

Boresight angle that aligns the feed co-pol axis with a reference pointing orientation.

When a dish is tilted away from a nominal pointing direction, the ZYZ Euler parameterisation induces an apparent rotation of the X/Y polarisation axes in TIRS. This function computes the boresight correction that eliminates that rotation, so that boresight = boresight_for_fixed_pol_ref(alt, az) + chi gives a pointing whose feed orientation differs from the reference only by the controllable roll chi.

Parameters:

Name Type Description Default
alt float

Pointing altitude in radians.

required
az float

Pointing azimuth in radians.

required
ref_alt float

Reference pointing altitude in radians. Defaults to pi/2 (zenith).

pi / 2
ref_az float

Reference pointing azimuth in radians. Defaults to pi (South).

pi
ref_boresight float

Reference boresight in radians. Defaults to 0.

0.0

Returns:

Type Description
float

Boresight angle in radians to pass to :func:zenith_to_pointing or :meth:TIRSVoltageBeam.from_ludwig3.

Source code in src/serval/rotate.py
def boresight_for_fixed_pol_ref(
    alt: float,
    az: float,
    ref_alt: float = np.pi / 2,
    ref_az: float = np.pi,
    ref_boresight: float = 0.0,
) -> float:
    r"""Boresight angle that aligns the feed co-pol axis with a reference pointing orientation.

    When a dish is tilted away from a nominal pointing direction, the ZYZ Euler
    parameterisation induces an apparent rotation of the X/Y polarisation axes in TIRS.
    This function computes the boresight correction that eliminates that rotation, so
    that ``boresight = boresight_for_fixed_pol_ref(alt, az) + chi`` gives a pointing whose
    feed orientation differs from the reference only by the controllable roll ``chi``.

    Parameters
    ----------
    alt : float
        Pointing altitude in radians.
    az : float
        Pointing azimuth in radians.
    ref_alt : float
        Reference pointing altitude in radians.  Defaults to ``pi/2`` (zenith).
    ref_az : float
        Reference pointing azimuth in radians.  Defaults to ``pi`` (South).
    ref_boresight : float
        Reference boresight in radians.  Defaults to ``0``.

    Returns
    -------
    float
        Boresight angle in radians to pass to :func:`zenith_to_pointing` or
        :meth:`TIRSVoltageBeam.from_ludwig3`.
    """
    x_ref = zenith_to_pointing(ref_alt, ref_az, ref_boresight).inv().apply(
        np.array([1.0, 0.0, 0.0])
    )
    R_new_active = zenith_to_pointing(alt, az, 0.0).inv()
    x_new0 = R_new_active.apply(np.array([1.0, 0.0, 0.0]))
    y_new0 = R_new_active.apply(np.array([0.0, 1.0, 0.0]))
    return float(np.arctan2(float(np.dot(x_ref, y_new0)), float(np.dot(x_ref, x_new0))))

cirs_to_cirs(source_epoch, target_epoch)

Return a passive rotation from CIRS at one epoch to CIRS at another.

Routes through GCRS using both CIO-based matrices: CIRS(source) \(\to\) GCRS \(\to\) CIRS(target).

Parameters:

Name Type Description Default
source_epoch str

The source epoch string (e.g. "J2000"), parsed by astropy.time.Time.

required
target_epoch str

The target epoch string, parsed by astropy.time.Time.

required

Returns:

Type Description
Rotation

A Rotation from CIRS(source) to CIRS(target).

Source code in src/serval/rotate.py
def cirs_to_cirs(source_epoch: str, target_epoch: str) -> Rotation:
    r"""Return a passive rotation from CIRS at one epoch to CIRS at another.

    Routes through GCRS using both CIO-based matrices:
    CIRS(source) $\to$ GCRS $\to$ CIRS(target).

    Parameters
    ----------
    source_epoch : str
        The source epoch string (e.g. ``"J2000"``), parsed by
        ``astropy.time.Time``.
    target_epoch : str
        The target epoch string, parsed by ``astropy.time.Time``.

    Returns
    -------
    Rotation
        A ``Rotation`` from CIRS(source) to CIRS(target).
    """
    c2i_target = gcrs_to_cirs(target_epoch).as_matrix()
    c2i_source = gcrs_to_cirs(source_epoch).as_matrix()
    R = c2i_target @ c2i_source.T
    return Rotation.from_matrix(R)

diurnal_aberrate_tirs(n, latitude, longitude)

Apply diurnal-aberration to TIRS unit directions.

Transforms unit direction vector(s) \(\hat{n}\) in the TIRS frame to account for the special-relativistic aberration caused by Earth's diurnal rotation. The observer velocity \(\boldsymbol{\beta} = \mathbf{v}/c\) in TIRS is exactly constant (independent of ERA or epoch), given by \(\boldsymbol{\beta} = (-\omega r_y,\; \omega r_x,\; 0) / c\) where \((r_x, r_y)\) are the WGS84 coordinates of the observer.

Parameters:

Name Type Description Default
n NDArray[float64]

Unit direction vector(s) in the TIRS frame. Shape (..., 3).

required
latitude float

Geodetic latitude in radians.

required
longitude float

Geodetic longitude in radians.

required

Returns:

Type Description
NDArray[float64]

Aberrated unit direction vector(s). Same shape as n.

Source code in src/serval/rotate.py
def diurnal_aberrate_tirs(
    n: npt.NDArray[np.float64],
    latitude: float,
    longitude: float,
) -> npt.NDArray[np.float64]:
    r"""Apply diurnal-aberration to TIRS unit directions.

    Transforms unit direction vector(s) $\hat{n}$ in the TIRS frame to account
    for the special-relativistic aberration caused by Earth's diurnal rotation.
    The observer velocity $\boldsymbol{\beta} = \mathbf{v}/c$ in TIRS is exactly
    constant (independent of ERA or epoch), given by
    $\boldsymbol{\beta} = (-\omega r_y,\; \omega r_x,\; 0) / c$
    where $(r_x, r_y)$ are the WGS84 coordinates of the observer.

    Parameters
    ----------
    n : npt.NDArray[np.float64]
        Unit direction vector(s) in the TIRS frame.  Shape ``(..., 3)``.
    latitude : float
        Geodetic latitude in radians.
    longitude : float
        Geodetic longitude in radians.

    Returns
    -------
    npt.NDArray[np.float64]
        Aberrated unit direction vector(s).  Same shape as ``n``.
    """
    # Earth rotation rate — IERS 2010 nominal value, consistent with ERFA/Astropy
    _OMEGA_EARTH_RAD_S = 7.292115085e-5
    loc = coords.EarthLocation.from_geodetic(
        lon=longitude * units.rad,
        lat=latitude * units.rad,
        height=0.0 * units.m,
    )
    rx = loc.x.to_value(units.m)
    ry = loc.y.to_value(units.m)
    beta = np.array([-_OMEGA_EARTH_RAD_S * ry, _OMEGA_EARTH_RAD_S * rx, 0.0])
    beta /= consts.c.to("m/s").value

    n = np.asarray(n, dtype=float)
    beta_sq = float(beta @ beta)
    gamma = 1.0 / np.sqrt(1.0 - beta_sq)
    dot = n @ beta
    scale = ((gamma - 1.0) * dot / beta_sq + gamma)[..., np.newaxis]
    aberrated = (n + scale * beta) / (gamma * (1.0 + dot))[..., np.newaxis]
    return aberrated / np.linalg.norm(aberrated, axis=-1, keepdims=True)

enu_to_tirs(latitude, longitude)

Creates a rotation object for basis rotations from the ENU coordinate frame of an observer at the specified latitude and longitude to the TIRS frame.

Parameters:

Name Type Description Default
latitude float

The latitude in radians.

required
longitude float

The longitude in radians.

required

Returns:

Type Description
Rotation

A Rotation object representing the ENU to TIRS basis rotation.

Source code in src/serval/rotate.py
def enu_to_tirs(latitude: float, longitude: float) -> Rotation:
    r"""Creates a rotation object for basis rotations from the ENU coordinate frame of an observer
    at the specified latitude and longitude to the TIRS frame.
    Parameters
    ----------
    latitude : float
        The latitude in radians.
    longitude : float
        The longitude in radians.
    Returns
    -------
    Rotation
        A Rotation object representing the ENU to TIRS basis rotation.
    """
    zenith_to_tirs = tirs_to_zenith(latitude=latitude, longitude=longitude).inv()
    return zenith_to_tirs * enu_to_zenith()

enu_to_zenith()

Provides a rotation object converting from the ENU basis to the local Zenith basis.

Returns:

Type Description
Rotation

A Rotation object.

Source code in src/serval/rotate.py
def enu_to_zenith() -> Rotation:
    r"""Provides a rotation object converting from the ENU basis
    to the local Zenith basis.
    Returns
    -------
    Rotation
        A Rotation object.
    """
    # fmt: off
    return Rotation.from_matrix([
        [0, -1, 0],
        [1,  0, 0],
        [0,  0, 1]]
    )

eulers_from_rotation(rotation)

Extracts the ZYZ Euler angles from a rotation object.

The convention used here and throughout is that of ZYZ passive (basis) rotations.

Parameters:

Name Type Description Default
rotation Rotation

A Rotation object.

required

Returns:

Type Description
tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the rotation.

Source code in src/serval/rotate.py
def eulers_from_rotation(rotation: Rotation) -> EulersType:
    r"""Extracts the ZYZ Euler angles from a rotation object.

    The convention used here and throughout is that of
    ZYZ passive (basis) rotations.

    Parameters
    ----------
    rotation : Rotation
        A Rotation object.
    Returns
    -------
    tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the rotation.
    """
    # .inv() for passive convention
    return tuple(rotation.inv().as_euler("ZYZ"))

frame_rotation_to_cirs(source_basis, epoch)

Return a passive rotation from a celestial coordinate frame to CIRS.

Dispatches to :func:icrs_to_cirs, :func:galactic_to_cirs, or :func:cirs_to_cirs based on source_basis.

Parameters:

Name Type Description Default
source_basis ``"ICRS"``, ``"Galactic"``, or ``"CIRS:<epoch>"``

The source coordinate frame. For CIRS input, append the old epoch after a colon, e.g. "CIRS:J2000".

required
epoch str

The target epoch string (e.g. "J2000"), parsed by astropy.time.Time.

required

Returns:

Type Description
Rotation

A Rotation from the source frame to CIRS at the requested epoch.

Raises:

Type Description
ValueError

If source_basis is not recognised.

Source code in src/serval/rotate.py
def frame_rotation_to_cirs(
    source_basis: str,
    epoch: str,
) -> Rotation:
    r"""Return a passive rotation from a celestial coordinate frame to CIRS.

    Dispatches to :func:`icrs_to_cirs`, :func:`galactic_to_cirs`, or
    :func:`cirs_to_cirs` based on ``source_basis``.

    Parameters
    ----------
    source_basis : ``"ICRS"``, ``"Galactic"``, or ``"CIRS:<epoch>"``
        The source coordinate frame. For CIRS input, append the old
        epoch after a colon, e.g. ``"CIRS:J2000"``.
    epoch : str
        The target epoch string (e.g. ``"J2000"``), parsed by
        ``astropy.time.Time``.

    Returns
    -------
    Rotation
        A ``Rotation`` from the source frame to CIRS at the
        requested epoch.

    Raises
    ------
    ValueError
        If ``source_basis`` is not recognised.
    """
    if source_basis == "ICRS":
        return icrs_to_cirs(epoch)
    elif source_basis == "Galactic":
        return galactic_to_cirs(epoch)
    elif source_basis.startswith("CIRS:"):
        source_epoch = source_basis.split(":", 1)[1]
        return cirs_to_cirs(source_epoch, epoch)
    else:
        raise ValueError(
            f"Unsupported source_basis '{source_basis}'. "
            "Expected 'ICRS', 'Galactic', or 'CIRS:<epoch>'."
        )

galactic_to_cirs(epoch)

Return a passive rotation from Galactic coordinates to CIRS.

The Galactic-to-ICRS rotation (from astropy) is composed with the CIO-based celestial-to-intermediate matrix. Aberration is not included.

Parameters:

Name Type Description Default
epoch str

The target epoch string (e.g. "J2000"), parsed by astropy.time.Time.

required

Returns:

Type Description
Rotation

A Rotation from Galactic to CIRS.

Source code in src/serval/rotate.py
def galactic_to_cirs(epoch: str) -> Rotation:
    r"""Return a passive rotation from Galactic coordinates to CIRS.

    The Galactic-to-ICRS rotation (from astropy) is composed with the
    CIO-based celestial-to-intermediate matrix. Aberration is not included.

    Parameters
    ----------
    epoch : str
        The target epoch string (e.g. ``"J2000"``), parsed by
        ``astropy.time.Time``.

    Returns
    -------
    Rotation
        A ``Rotation`` from Galactic to CIRS.
    """
    from astropy.coordinates import ICRS, CartesianRepresentation, Galactic

    basis_xyz = CartesianRepresentation(
        x=[1, 0, 0],
        y=[0, 1, 0],
        z=[0, 0, 1],
    )
    gal_coords = Galactic().realize_frame(basis_xyz)
    icrs_coords = gal_coords.transform_to(ICRS())
    gal_to_icrs = np.array(icrs_coords.cartesian.xyz.value)

    return Rotation.from_matrix(gcrs_to_cirs(epoch).as_matrix() @ gal_to_icrs)

gcrs_to_cirs(epoch)

Creates a rotation object for basis rotations from the GCRS frame to CIRS.

Uses the IAU 2006/2000A CIO-based celestial-to-intermediate matrix (via ERFA's c2i06a).

Parameters:

Name Type Description Default
epoch str

The epoch string (e.g. "J2000"), parsed by astropy.time.Time.

required

Returns:

Type Description
Rotation

A Rotation from GCRS to CIRS.

Source code in src/serval/rotate.py
def gcrs_to_cirs(epoch: str) -> Rotation:
    r"""Creates a rotation object for basis rotations from the GCRS frame to CIRS.

    Uses the IAU 2006/2000A CIO-based celestial-to-intermediate matrix
    (via ERFA's ``c2i06a``).

    Parameters
    ----------
    epoch : str
        The epoch string (e.g. ``"J2000"``), parsed by
        ``astropy.time.Time``.

    Returns
    -------
    Rotation
        A ``Rotation`` from GCRS to CIRS.
    """

    obstime = Time(epoch)
    jd1, jd2 = obstime.tt.jd1, obstime.tt.jd2
    c2i = erfa.c2i06a(jd1, jd2)
    return Rotation.from_matrix(c2i)

generate_telescope_wigner_ds(beam_lmax, beam_mmax, baseline_lmax, latitude, longitude, altitude, azimuth, baseline_enu)

Generates Wigner-D matrices for the TIRS to telescope pointing frame and T IRS to baseline frame basis rotations.

Parameters:

Name Type Description Default
beam_lmax int

The maximum degree of the beam Wigner-D matrix.

required
beam_mmax int

The maximum order of m' for the beam Wigner-D matrix.

required
baseline_lmax int

The maximum degree of the baseline Wigner-D matrix.

required
latitude float

The latitude in radians.

required
longitude float

The longitude in radians.

required
altitude float

The altitude angle in radians.

required
azimuth float

The azimuth angle in radians.

required
baseline_enu tuple[float, float, float]

The baseline vector components in the ENU frame (east, north, up).

required

Returns:

Type Description
tuple[NDArray[complex128], NDArray[complex128]]

The Wigner-D matrices for the TIRS to telescope pointing frame and TIRS to baseline frame basis rotations.

Source code in src/serval/rotate.py
def generate_telescope_wigner_ds(
    beam_lmax: int,
    beam_mmax: int,
    baseline_lmax: int,
    latitude: float,
    longitude: float,
    altitude: float,
    azimuth: float,
    baseline_enu: tuple[float, float, float],
) -> tuple[npt.NDArray[np.complex128], npt.NDArray[np.complex128]]:
    r"""Generates Wigner-D matrices for the TIRS to telescope pointing frame and T
    IRS to baseline frame basis rotations.
    Parameters
    ----------
    beam_lmax : int
        The maximum degree of the beam Wigner-D matrix.
    beam_mmax : int
        The maximum order of m' for the beam Wigner-D matrix.
    baseline_lmax : int
        The maximum degree of the baseline Wigner-D matrix.
    latitude : float
        The latitude in radians.
    longitude : float
        The longitude in radians.
    altitude : float
        The altitude angle in radians.
    azimuth : float
        The azimuth angle in radians.
    baseline_enu : tuple[float, float, float]
        The baseline vector components in the ENU frame (east, north, up).
    Returns
    -------
    tuple[npt.NDArray[np.complex128], npt.NDArray[np.complex128]]
        The Wigner-D matrices for the TIRS to telescope pointing frame and TIRS to baseline
        frame basis rotations.
    """
    print("Generating Pointing Wigner-D...")
    wigner_d_pointing = wigner_d(
        beam_lmax,
        beam_mmax,
        tirs_to_pointing(
            latitude=latitude, longitude=longitude, altitude=altitude, azimuth=azimuth
        ),
    )
    print("Generating Baseline Wigner-D...")
    wigner_d_baseline = wigner_d(
        baseline_lmax,
        0,
        tirs_to_baseline_direc(
            latitude=latitude,
            longitude=longitude,
            east=baseline_enu[0],
            north=baseline_enu[1],
            up=baseline_enu[2],
        ),
    )
    return wigner_d_pointing, wigner_d_baseline

icrs_to_cirs(epoch)

Return a passive rotation from ICRS to CIRS at the given epoch.

Delegates to :func:gcrs_to_cirs as for the purposes of this code, the GCRS to ICRS transformation is neglible, mainly aberration is not ignored limiting the accuracy to ~20.5 arcsec.

Parameters:

Name Type Description Default
epoch str

The target epoch string (e.g. "J2000"), parsed by astropy.time.Time.

required

Returns:

Type Description
Rotation

A Rotation from ICRS to CIRS.

Source code in src/serval/rotate.py
def icrs_to_cirs(epoch: str) -> Rotation:
    r"""Return a passive rotation from ICRS to CIRS at the given epoch.

    Delegates to :func:`gcrs_to_cirs` as for the purposes of this code,
    the GCRS to ICRS transformation is neglible, mainly aberration is
    not ignored limiting the accuracy to ~20.5 arcsec.

    Parameters
    ----------
    epoch : str
        The target epoch string (e.g. ``"J2000"``), parsed by
        ``astropy.time.Time``.

    Returns
    -------
    Rotation
        A ``Rotation`` from ICRS to CIRS.
    """
    return gcrs_to_cirs(epoch)

inv_eulers(eulers)

Provides the Euler angles corresponding to the inverse rotation of those given.

Parameters:

Name Type Description Default
eulers tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the rotation.

required

Returns:

Type Description
tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the inverse rotation.

Source code in src/serval/rotate.py
def inv_eulers(eulers: EulersType) -> EulersType:
    r"""Provides the Euler angles corresponding to the inverse rotation of those given.
    Parameters
    ----------
    eulers : tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the rotation.
    Returns
    -------
    tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the inverse rotation.
    """
    return (-eulers[2], -eulers[1], -eulers[0])

itrs_enu_to_tirs_enu(geodetic_lat, geodetic_lon, enu, epoch)

Convert an ENU vector from the ITRS frame to its TIRS-frame ENU components.

Computes \(W^\top \mathbf{v}_\mathrm{ITRS}\) (the exact TIRS Cartesian vector) then re-expresses it as ENU components in the TIRS frame at the polar-motion-corrected (lat_tirs, lon_tirs). The result satisfies:

enu_to_tirs(lat_tirs, lon_tirs).apply(enu_tirs) == W^T @ enu_to_tirs(lat, lon).apply(enu)

so that passing (lat_tirs, lon_tirs, enu_tirs) to SERVAL functions that internally call enu_to_tirs yields the physically correct TIRS Cartesian vector.

Parameters:

Name Type Description Default
geodetic_lat float

Observer WGS84 geodetic latitude in radians.

required
geodetic_lon float

Observer WGS84 geodetic longitude in radians.

required
enu (array - like, shape(3))

ENU vector in the ITRS frame (e.g. baseline in metres).

required
epoch Time

Epoch at which the IERS polar motion is evaluated.

required

Returns:

Type Description
(ndarray, shape(3))

ENU components in the TIRS frame at (lat_tirs, lon_tirs).

Source code in src/serval/rotate.py
def itrs_enu_to_tirs_enu(
    geodetic_lat: float,
    geodetic_lon: float,
    enu: npt.ArrayLike,
    epoch: Time,
) -> npt.NDArray[np.float64]:
    r"""Convert an ENU vector from the ITRS frame to its TIRS-frame ENU components.

    Computes $W^\top \mathbf{v}_\mathrm{ITRS}$ (the exact TIRS Cartesian vector) then
    re-expresses it as ENU components in the TIRS frame at the polar-motion-corrected
    ``(lat_tirs, lon_tirs)``.  The result satisfies:

    ``enu_to_tirs(lat_tirs, lon_tirs).apply(enu_tirs) == W^T @ enu_to_tirs(lat, lon).apply(enu)``

    so that passing ``(lat_tirs, lon_tirs, enu_tirs)`` to SERVAL functions that internally
    call [enu_to_tirs][serval.rotate.enu_to_tirs] yields the physically correct TIRS
    Cartesian vector.

    Parameters
    ----------
    geodetic_lat : float
        Observer WGS84 geodetic latitude in radians.
    geodetic_lon : float
        Observer WGS84 geodetic longitude in radians.
    enu : array-like, shape (3,)
        ENU vector in the ITRS frame (e.g. baseline in metres).
    epoch : astropy.time.Time
        Epoch at which the IERS polar motion is evaluated.

    Returns
    -------
    ndarray, shape (3,)
        ENU components in the TIRS frame at ``(lat_tirs, lon_tirs)``.
    """
    R = itrs_to_tirs(epoch)
    bl_itrs = enu_to_tirs(geodetic_lat, geodetic_lon).apply(np.asarray(enu, dtype=float))
    bl_tirs = R.apply(bl_itrs)

    # Can't use spherical_to_normal due to circular imports...
    up_itrs = np.array([
        np.cos(geodetic_lat) * np.cos(geodetic_lon),
        np.cos(geodetic_lat) * np.sin(geodetic_lon),
        np.sin(geodetic_lat),
    ])
    up_tirs = R.apply(up_itrs)
    # Can't use normal_to_spherical due to circular imports...
    lat_tirs = float(np.arcsin(up_tirs[2]))
    lon_tirs = float(np.arctan2(up_tirs[1], up_tirs[0]))

    return enu_to_tirs(lat_tirs, lon_tirs).inv().apply(bl_tirs)

itrs_pointing_to_tirs_pointing(geodetic_lat, geodetic_lon, altitude, azimuth, boresight, epoch)

Convert an ITRS-frame pointing direction and boresight to their TIRS equivalents.

Converts (altitude, azimuth) by expressing the pointing unit vector in ITRS ENU, applying itrs_enu_to_tirs_enu, and extracting TIRS spherical coordinates. The boresight is corrected by tracking the ITRS pointing-frame x-axis into TIRS ENU and extracting the equivalent angle — the same technique used by boresight_for_fixed_pol_ref.

Parameters:

Name Type Description Default
geodetic_lat float

Observer WGS84 geodetic latitude in radians.

required
geodetic_lon float

Observer WGS84 geodetic longitude in radians.

required
altitude float

Pointing altitude in the ITRS ENU frame, in radians.

required
azimuth float

Pointing azimuth (North through East) in the ITRS ENU frame, in radians.

required
boresight float

Feed boresight rotation around the pointing axis in radians.

required
epoch Time

Epoch at which the IERS polar motion is evaluated.

required

Returns:

Type Description
tuple[float, float, float]

(altitude_tirs, azimuth_tirs, boresight_tirs) in radians.

Source code in src/serval/rotate.py
def itrs_pointing_to_tirs_pointing(
    geodetic_lat: float,
    geodetic_lon: float,
    altitude: float,
    azimuth: float,
    boresight: float,
    epoch: Time,
) -> tuple[float, float, float]:
    r"""Convert an ITRS-frame pointing direction and boresight to their TIRS equivalents.

    Converts ``(altitude, azimuth)`` by expressing the pointing unit vector in ITRS ENU,
    applying [itrs_enu_to_tirs_enu][serval.rotate.itrs_enu_to_tirs_enu], and extracting
    TIRS spherical coordinates.  The boresight is corrected by tracking the ITRS
    pointing-frame x-axis into TIRS ENU and extracting the equivalent angle — the same
    technique used by [boresight_for_fixed_pol_ref][serval.rotate.boresight_for_fixed_pol_ref].

    Parameters
    ----------
    geodetic_lat : float
        Observer WGS84 geodetic latitude in radians.
    geodetic_lon : float
        Observer WGS84 geodetic longitude in radians.
    altitude : float
        Pointing altitude in the ITRS ENU frame, in radians.
    azimuth : float
        Pointing azimuth (North through East) in the ITRS ENU frame, in radians.
    boresight : float
        Feed boresight rotation around the pointing axis in radians.
    epoch : astropy.time.Time
        Epoch at which the IERS polar motion is evaluated.

    Returns
    -------
    tuple[float, float, float]
        ``(altitude_tirs, azimuth_tirs, boresight_tirs)`` in radians.
    """
    # Can't use spherical_to_normal due to circular imports...
    enu = np.array([
        np.sin(azimuth) * np.cos(altitude),
        np.cos(azimuth) * np.cos(altitude),
        np.sin(altitude),
    ])
    enu_tirs = itrs_enu_to_tirs_enu(geodetic_lat, geodetic_lon, enu, epoch)
    # Can't use normal_to_spherical due to circular imports...
    alt_tirs = float(np.arcsin(np.clip(enu_tirs[2], -1.0, 1.0)))
    az_tirs = float(np.arctan2(enu_tirs[0], enu_tirs[1]))

    x_ref_itrs = zenith_to_pointing(altitude, azimuth, boresight).inv().apply(
        np.array([1.0, 0.0, 0.0])
    )
    x_ref_tirs = itrs_enu_to_tirs_enu(geodetic_lat, geodetic_lon, x_ref_itrs, epoch)
    R_tirs_zero = zenith_to_pointing(alt_tirs, az_tirs, 0.0).inv()
    x_tirs0 = R_tirs_zero.apply(np.array([1.0, 0.0, 0.0]))
    y_tirs0 = R_tirs_zero.apply(np.array([0.0, 1.0, 0.0]))
    boresight_tirs = float(np.arctan2(np.dot(x_ref_tirs, y_tirs0), np.dot(x_ref_tirs, x_tirs0)))

    return alt_tirs, az_tirs, boresight_tirs

itrs_to_tirs(epoch)

Rotation from ITRS to TIRS at a given epoch, applying IERS polar motion.

Parameters:

Name Type Description Default
epoch Time

Epoch at which the IERS polar motion is evaluated.

required

Returns:

Type Description
Rotation

Rotation that maps ITRS Cartesian vectors to TIRS Cartesian vectors.

Source code in src/serval/rotate.py
def itrs_to_tirs(epoch: Time) -> Rotation:
    """Rotation from ITRS to TIRS at a given epoch, applying IERS polar motion.

    Parameters
    ----------
    epoch : astropy.time.Time
        Epoch at which the IERS polar motion is evaluated.

    Returns
    -------
    Rotation
        Rotation that maps ITRS Cartesian vectors to TIRS Cartesian vectors.
    """
    xp, yp = get_polar_motion(epoch)
    sp = erfa.sp00(epoch.tt.jd1, epoch.tt.jd2)
    return Rotation.from_matrix(erfa.pom00(xp, yp, sp).T)

itrs_to_tirs_inputs(geodetic_lat, geodetic_lon, altitude, azimuth, boresight, baselines_enu, epoch)

Convert all ITRS observer inputs to their TIRS equivalents in one call.

Applies the IERS polar motion matrix \(W^\top\) to the site location, each baseline ENU vector, and the pointing direction.

Delegates to itrs_to_tirs_latlon, itrs_pointing_to_tirs_pointing, and itrs_enu_to_tirs_enu.

Parameters:

Name Type Description Default
geodetic_lat float

Observer WGS84 geodetic latitude in radians.

required
geodetic_lon float

Observer WGS84 geodetic longitude in radians.

required
altitude float

Pointing altitude in the ITRS ENU frame, in radians.

required
azimuth float

Pointing azimuth (North through East) in the ITRS ENU frame, in radians.

required
boresight float

Feed boresight rotation around the pointing axis, in radians.

required
baselines_enu list of array-like, each shape (3,)

Baseline ENU vectors in the ITRS frame.

required
epoch Time

Epoch at which the IERS polar motion is evaluated.

required

Returns:

Type Description
tuple

(latitude, longitude, altitude, azimuth, boresight, baselines_enu) all expressed in the TIRS frame.

Source code in src/serval/rotate.py
def itrs_to_tirs_inputs(
    geodetic_lat: float,
    geodetic_lon: float,
    altitude: float,
    azimuth: float,
    boresight: float,
    baselines_enu: list[npt.ArrayLike],
    epoch: Time,
) -> tuple[float, float, float, float, float, list[npt.NDArray[np.float64]]]:
    r"""Convert all ITRS observer inputs to their TIRS equivalents in one call.

    Applies the IERS polar motion matrix $W^\top$ to the site location, each baseline
    ENU vector, and the pointing direction.

    Delegates to [itrs_to_tirs_latlon][serval.rotate.itrs_to_tirs_latlon],
    [itrs_pointing_to_tirs_pointing][serval.rotate.itrs_pointing_to_tirs_pointing],
    and [itrs_enu_to_tirs_enu][serval.rotate.itrs_enu_to_tirs_enu].

    Parameters
    ----------
    geodetic_lat : float
        Observer WGS84 geodetic latitude in radians.
    geodetic_lon : float
        Observer WGS84 geodetic longitude in radians.
    altitude : float
        Pointing altitude in the ITRS ENU frame, in radians.
    azimuth : float
        Pointing azimuth (North through East) in the ITRS ENU frame, in radians.
    boresight : float
        Feed boresight rotation around the pointing axis, in radians.
    baselines_enu : list of array-like, each shape (3,)
        Baseline ENU vectors in the ITRS frame.
    epoch : astropy.time.Time
        Epoch at which the IERS polar motion is evaluated.

    Returns
    -------
    tuple
        ``(latitude, longitude, altitude, azimuth, boresight, baselines_enu)``
        all expressed in the TIRS frame.
    """
    lat_tirs, lon_tirs = itrs_to_tirs_latlon(geodetic_lat, geodetic_lon, epoch)
    alt_tirs, az_tirs, boresight_tirs = itrs_pointing_to_tirs_pointing(
        geodetic_lat, geodetic_lon, altitude, azimuth, boresight, epoch
    )
    bls_tirs = [
        itrs_enu_to_tirs_enu(geodetic_lat, geodetic_lon, bl, epoch)
        for bl in baselines_enu
    ]
    return lat_tirs, lon_tirs, alt_tirs, az_tirs, boresight_tirs, bls_tirs

itrs_to_tirs_latlon(geodetic_lat, geodetic_lon, epoch)

TIRS geocentric latitude and longitude for an ITRS observer at a given epoch.

Applies the IERS polar motion matrix \(W = \mathrm{erfa.pom00}\) to the geodetic zenith direction in ITRS, then returns its TIRS spherical coordinates. Passing the returned angles to enu_to_tirs or tirs_to_zenith makes those functions consistent with Astropy's ITRS coordinate chain.

Parameters:

Name Type Description Default
geodetic_lat float

Observer WGS84 geodetic latitude in radians.

required
geodetic_lon float

Observer WGS84 geodetic longitude in radians.

required
epoch Time

Epoch at which the IERS polar motion is evaluated.

required

Returns:

Type Description
tuple[float, float]

(lat_tirs, lon_tirs) in radians.

Source code in src/serval/rotate.py
def itrs_to_tirs_latlon(
    geodetic_lat: float,
    geodetic_lon: float,
    epoch: Time,
) -> tuple[float, float]:
    r"""TIRS geocentric latitude and longitude for an ITRS observer at a given epoch.

    Applies the IERS polar motion matrix $W = \mathrm{erfa.pom00}$ to the geodetic
    zenith direction in ITRS, then returns its TIRS spherical coordinates.  Passing
    the returned angles to [enu_to_tirs][serval.rotate.enu_to_tirs] or
    [tirs_to_zenith][serval.rotate.tirs_to_zenith] makes those functions consistent
    with Astropy's ITRS coordinate chain.

    Parameters
    ----------
    geodetic_lat : float
        Observer WGS84 geodetic latitude in radians.
    geodetic_lon : float
        Observer WGS84 geodetic longitude in radians.
    epoch : astropy.time.Time
        Epoch at which the IERS polar motion is evaluated.

    Returns
    -------
    tuple[float, float]
        ``(lat_tirs, lon_tirs)`` in radians.
    """

    # Can't use spherical_to_normal due to circular imports...
    up_itrs = np.array([
        np.cos(geodetic_lat) * np.cos(geodetic_lon),
        np.cos(geodetic_lat) * np.sin(geodetic_lon),
        np.sin(geodetic_lat),
    ])
    up_tirs = itrs_to_tirs(epoch).apply(up_itrs)
    return float(np.arcsin(up_tirs[2])), float(np.arctan2(up_tirs[1], up_tirs[0]))

offset_pointing(offset_magnitude, offset_direction, offset_boresight=0.0, ref_alt=np.pi / 2, ref_az=np.pi, ref_boresight=0.0)

Convert a pointing offset to an absolute (alt, az, boresight).

The position angle offset_direction uses geographic North/East as reference, which is well-defined at the zenith (over-the-top convention).

Parameters:

Name Type Description Default
offset_magnitude float

Angular offset magnitude from the reference pointing, in radians.

required
offset_direction float

Position angle of the offset in radians, measured from North (0) toward East (\(\pi/2\)).

required
offset_boresight float

Boresight roll in radians, relative to the polarisation-stabilised reference. offset_boresight=0 keeps the X/Y feed axes aligned with the reference orientation.

0.0
ref_alt float

Reference pointing altitude and azimuth in radians. Default: zenith (\(\pi/2\), \(\pi\)).

pi / 2
ref_az float

Reference pointing altitude and azimuth in radians. Default: zenith (\(\pi/2\), \(\pi\)).

pi / 2
ref_boresight float

Reference boresight in radians. Default: 0.

0.0

Returns:

Type Description
alt, az, boresight : tuple[float, float, float]

New pointing parameters in radians, suitable for :func:zenith_to_pointing or :meth:TIRSVoltageBeam.from_ludwig3.

Notes

The (alt, az) computation uses the spherical law of cosines on the triangle formed by the zenith P (alt = \(\pi/2\)), the reference pointing A, and the new pointing B. The sides are the co-altitudes \(b = \pi/2 - \text{ref\_alt}\) and \(a = \pi/2 - \text{alt}\), and the offset magnitude \(\rho\). The angle at A is the position angle \(\psi\) (offset_direction), measured from the great circle toward the zenith (North) toward increasing azimuth (East):

\[\sin(\mathrm{alt}) = \sin(\mathrm{ref\_alt})\cos\rho + \cos(\mathrm{ref\_alt})\sin\rho\cos\psi\]
\[\Delta\mathrm{az} = \mathrm{arctan2}\!\bigl(\sin\rho\sin\psi,\; \cos(\mathrm{ref\_alt})\cos\rho - \sin(\mathrm{ref\_alt})\sin\rho\cos\psi\bigr)\]

arctan2 handles the over-the-top case naturally.

Source code in src/serval/rotate.py
def offset_pointing(
    offset_magnitude: float,
    offset_direction: float,
    offset_boresight: float = 0.0,
    ref_alt: float = np.pi / 2,
    ref_az: float = np.pi,
    ref_boresight: float = 0.0,
) -> tuple[float, float, float]:
    r"""Convert a pointing offset to an absolute (alt, az, boresight).

    The position angle ``offset_direction`` uses geographic North/East as reference,
    which is well-defined at the zenith (over-the-top convention).

    Parameters
    ----------
    offset_magnitude : float
        Angular offset magnitude from the reference pointing, in radians.
    offset_direction : float
        Position angle of the offset in radians, measured from North (0) toward East ($\pi/2$).
    offset_boresight : float
        Boresight roll in radians, relative to the polarisation-stabilised reference.
        ``offset_boresight=0`` keeps the X/Y feed axes aligned with the reference orientation.
    ref_alt, ref_az : float
        Reference pointing altitude and azimuth in radians. Default: zenith ($\pi/2$, $\pi$).
    ref_boresight : float
        Reference boresight in radians. Default: 0.

    Returns
    -------
    alt, az, boresight : tuple[float, float, float]
        New pointing parameters in radians, suitable for :func:`zenith_to_pointing`
        or :meth:`TIRSVoltageBeam.from_ludwig3`.

    Notes
    -----
    The (alt, az) computation uses the spherical law of cosines on the triangle formed by
    the zenith **P** (alt = $\pi/2$), the reference pointing **A**, and the new pointing **B**.
    The sides are the co-altitudes $b = \pi/2 - \text{ref\_alt}$ and $a = \pi/2 - \text{alt}$,
    and the offset magnitude $\rho$. The angle at **A** is the position angle $\psi$
    (``offset_direction``), measured from the great circle toward the zenith (North) toward
    increasing azimuth (East):

    $$\sin(\mathrm{alt}) = \sin(\mathrm{ref\_alt})\cos\rho
    + \cos(\mathrm{ref\_alt})\sin\rho\cos\psi$$

    $$\Delta\mathrm{az} = \mathrm{arctan2}\!\bigl(\sin\rho\sin\psi,\;
        \cos(\mathrm{ref\_alt})\cos\rho - \sin(\mathrm{ref\_alt})\sin\rho\cos\psi\bigr)$$

    ``arctan2`` handles the over-the-top case naturally.
    """
    if offset_magnitude == 0.0:
        return ref_alt, ref_az, ref_boresight

    sin_alt = (
        np.sin(ref_alt) * np.cos(offset_magnitude)
        + np.cos(ref_alt) * np.sin(offset_magnitude) * np.cos(offset_direction)
    )
    alt = float(np.arcsin(np.clip(sin_alt, -1.0, 1.0)))
    daz = np.arctan2(
        np.sin(offset_magnitude) * np.sin(offset_direction),
        np.cos(ref_alt) * np.cos(offset_magnitude)
        - np.sin(ref_alt) * np.sin(offset_magnitude) * np.cos(offset_direction),
    )
    az = float((ref_az + daz + np.pi) % (2 * np.pi) - np.pi)
    boresight = (
        boresight_for_fixed_pol_ref(alt, az, ref_alt, ref_az, ref_boresight) + offset_boresight
    )
    return alt, az, float(boresight)

rotation_from_eulers(eulers, degrees=False)

Creates a rotation object from a set of Euler angles.

The convention used here and throughout is that of ZYZ passive (basis) rotations.

Parameters:

Name Type Description Default
eulers tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the rotation.

required
degrees bool

Whether the input angles are in degrees, by default False.

False

Returns:

Type Description
Rotation

The Rotation object.

Source code in src/serval/rotate.py
def rotation_from_eulers(
    eulers: EulersType,
    degrees: None | bool = False,
) -> Rotation:
    r"""Creates a rotation object from a set of Euler angles.

    The convention used here and throughout is that of
    ZYZ passive (basis) rotations.

    Parameters
    ----------
    eulers : tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the rotation.
    degrees : bool, optional
        Whether the input angles are in degrees, by default False.
    Returns
    -------
    Rotation
        The Rotation object.
    """
    # .inv() for passive convention
    return Rotation.from_euler("ZYZ", eulers, degrees=degrees).inv()

tirs_to_baseline_direc(latitude, longitude, east, north, up)

Creates a rotation object for basis rotations from the TIRS frame to a frame aligned with the direction of the baseline vector.

Parameters:

Name Type Description Default
latitude float

The latitude in radians.

required
longitude float

The longitude in radians.

required
east float

The East component of the baseline vector.

required
north float

The North component of the baseline vector.

required
up float

The Up component of the baseline vector.

required

Returns:

Type Description
Rotation

A Rotation object representing the TIRS to baseline direction frame basis rotation.

Source code in src/serval/rotate.py
def tirs_to_baseline_direc(
    latitude: float, longitude: float, east: float, north: float, up: float
) -> Rotation:
    r"""Creates a rotation object for basis rotations from the TIRS frame to a
    frame aligned with the direction of the baseline vector.
    Parameters
    ----------
    latitude : float
        The latitude in radians.
    longitude : float
        The longitude in radians.
    east : float
        The East component of the baseline vector.
    north : float
        The North component of the baseline vector.
    up : float
        The Up component of the baseline vector.
    Returns
    -------
    Rotation
        A Rotation object representing the TIRS to baseline direction frame basis rotation.
    """
    return zenith_to_baseline_direc(east=east, north=north, up=up) * tirs_to_zenith(
        latitude=latitude, longitude=longitude
    )

tirs_to_baseline_direc_eulers(latitude, longitude, east, north, up)

Returns the Euler angles for basis rotations from the TIRS frame to a frame aligned with the direction of the baseline vector.

Parameters:

Name Type Description Default
latitude float

The latitude in radians.

required
longitude float

The longitude in radians.

required
east float

The East component of the baseline vector.

required
north float

The North component of the baseline vector.

required
up float

The Up component of the baseline vector.

required

Returns:

Type Description
tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the TIRS to baseline frame basis rotation.

Source code in src/serval/rotate.py
def tirs_to_baseline_direc_eulers(
    latitude: float, longitude: float, east: float, north: float, up: float
) -> EulersType:
    r"""Returns the Euler angles for basis rotations from the TIRS frame to a
    frame aligned with the direction of the baseline vector.

    Parameters
    ----------
    latitude : float
        The latitude in radians.
    longitude : float
        The longitude in radians.
    east : float
        The East component of the baseline vector.
    north : float
        The North component of the baseline vector.
    up : float
        The Up component of the baseline vector.
    Returns
    -------
    tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the TIRS to baseline frame
         basis rotation.
    """
    return eulers_from_rotation(
        tirs_to_baseline_direc(
            latitude=latitude, longitude=longitude, east=east, north=north, up=up
        )
    )

tirs_to_cirs(era)

Creates a rotation object corresponding to the basis rotation from the TIRS frame to the CIRS frame.

Parameters:

Name Type Description Default
era float

The Earth Rotation Angle in radians.

required

Returns:

Type Description
Rotation

A Rotation object representing the TIRS to CIRS basis rotation.

Source code in src/serval/rotate.py
def tirs_to_cirs(era: float) -> Rotation:
    r"""Creates a rotation object corresponding to the basis rotation
    from the TIRS frame to the CIRS frame.
    Parameters
    ----------
    era : float
        The Earth Rotation Angle in radians.
    Returns
    -------
    Rotation
        A Rotation object representing the TIRS to CIRS basis rotation.
    """
    return rotation_from_eulers(tirs_to_cirs_eulers(era))

tirs_to_cirs_eulers(era)

Provides the Euler angles corresponding to the basis rotation from the TIRS frame to the CIRS frame.

The convention used here and throughout is that of ZYZ passive (basis) rotations.

Parameters:

Name Type Description Default
era float

The Earth Rotation Angle in radians.

required

Returns:

Type Description
tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the TIRS to CIRS basis rotation.

Source code in src/serval/rotate.py
def tirs_to_cirs_eulers(era: float) -> EulersType:
    r"""Provides the Euler angles corresponding to the basis rotation
    from the TIRS frame to the CIRS frame.

    The convention used here and throughout is that of
    ZYZ passive (basis) rotations.

    Parameters
    ----------
    era : float
        The Earth Rotation Angle in radians.
    Returns
    -------
    tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the TIRS to
        CIRS basis rotation.
    """
    return (-era, 0, 0)  # See USNO Circular 179

tirs_to_pointing(latitude, longitude, altitude=np.pi / 2, azimuth=np.pi, boresight=0.0)

Creates a rotation object for basis rotations from the TIRS frame to a frame aligned with the telescope pointing direction.

Parameters:

Name Type Description Default
latitude float

The latitude in radians.

required
longitude float

The longitude in radians.

required
altitude float

The altitude angle in radians, by default np.pi / 2.

pi / 2
azimuth float

The azimuth angle in radians, by default np.pi.

pi
boresight float

The boresight angle in radians, by default 0.0.

0.0

Returns:

Type Description
Rotation

A Rotation object representing the TIRS to telescope pointing frame basis rotation.

Source code in src/serval/rotate.py
def tirs_to_pointing(
    latitude: float,
    longitude: float,
    altitude: float = np.pi / 2,
    azimuth: float = np.pi,
    boresight: float = 0.0,
) -> Rotation:
    r"""Creates a rotation object for basis rotations from the TIRS frame to a frame aligned with
    the telescope pointing direction.
    Parameters
    ----------
    latitude : float
        The latitude in radians.
    longitude : float
        The longitude in radians.
    altitude : float, optional
        The altitude angle in radians, by default np.pi / 2.
    azimuth : float, optional
        The azimuth angle in radians, by default np.pi.
    boresight : float, optional
        The boresight angle in radians, by default 0.0.
    Returns
    -------
    Rotation
        A Rotation object representing the TIRS to telescope pointing frame basis rotation.
    """
    return zenith_to_pointing(
        altitude=altitude, azimuth=azimuth, boresight=boresight
    ) * tirs_to_zenith(latitude=latitude, longitude=longitude)

tirs_to_pointing_eulers(latitude, longitude, altitude=np.pi / 2, azimuth=np.pi, boresight=0.0)

Returns the Euler angles for basis rotations from the TIRS frame to a frame aligned with the telescope pointing direction.

The convention used here and throughout is that of ZYZ passive (basis) rotations.

Parameters:

Name Type Description Default
latitude float

The latitude in radians.

required
longitude float

The longitude in radians.

required
altitude float

The altitude angle in radians, by default np.pi / 2.

pi / 2
azimuth float

The azimuth angle in radians, by default np.pi.

pi
boresight float

The boresight angle in radians, by default 0.0.

0.0

Returns:

Type Description
tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the TIRS to telescope pointing basis rotation.

Source code in src/serval/rotate.py
def tirs_to_pointing_eulers(
    latitude: float,
    longitude: float,
    altitude: float = np.pi / 2,
    azimuth: float = np.pi,
    boresight: float = 0.0,
) -> EulersType:
    r"""Returns the Euler angles for basis rotations from the TIRS frame to a frame aligned
    with the telescope pointing direction.

    The convention used here and throughout is that of
    ZYZ passive (basis) rotations.

    Parameters
    ----------
    latitude : float
        The latitude in radians.
    longitude : float
        The longitude in radians.
    altitude : float, optional
        The altitude angle in radians, by default np.pi / 2.
    azimuth : float, optional
        The azimuth angle in radians, by default np.pi.
    boresight : float, optional
        The boresight angle in radians, by default 0.0.
    Returns
    -------
    tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the TIRS to telescope pointing
        basis rotation.
    """
    return eulers_from_rotation(
        tirs_to_pointing(
            latitude=latitude,
            longitude=longitude,
            altitude=altitude,
            azimuth=azimuth,
            boresight=boresight,
        )
    )

tirs_to_zenith(latitude, longitude)

Creates a rotation object for basis rotations from the TIRS frame to the local Zenith frame for an observer at a given latitude and longitude.

Parameters:

Name Type Description Default
latitude float

The latitude in radians.

required
longitude float

The longitude in radians.

required

Returns:

Type Description
Rotation

A Rotation object representing the TIRS to local Zenith basis rotation.

Source code in src/serval/rotate.py
def tirs_to_zenith(latitude: float, longitude: float) -> Rotation:
    r"""Creates a rotation object for basis rotations from the TIRS frame to the local Zenith frame
    for an observer at a given latitude and longitude.

    Parameters
    ----------
    latitude : float
        The latitude in radians.
    longitude : float
        The longitude in radians.
    Returns
    -------
    Rotation
        A Rotation object representing the TIRS to local Zenith basis rotation.
    """
    return rotation_from_eulers(
        tirs_to_zenith_eulers(latitude=latitude, longitude=longitude),
    )

tirs_to_zenith_eulers(latitude, longitude)

Returns the Euler angles for basis rotations from the TIRS frame to the local Zenith frame for an observer at a given latitude and longitude.

The convention used here and throughout is that of ZYZ passive (basis) rotations.

Parameters:

Name Type Description Default
latitude float

The latitude in radians.

required
longitude float

The longitude in radians.

required

Returns:

Type Description
tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the TIRS to local Zenith basis rotation.

Source code in src/serval/rotate.py
def tirs_to_zenith_eulers(latitude: float, longitude: float) -> EulersType:
    r"""Returns the Euler angles for basis rotations from the TIRS frame to the local Zenith frame
    for an observer at a given latitude and longitude.

    The convention used here and throughout is that of
    ZYZ passive (basis) rotations.

    Parameters
    ----------
    latitude : float
        The latitude in radians.
    longitude : float
        The longitude in radians.
    Returns
    -------
    tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the TIRS to local Zenith
        basis rotation.
    """
    return (longitude, np.pi / 2 - latitude, 0.0)

wigner_d(lmax, mprime_max, eulers_or_rotation)

Generates Wigner-D matrices up to a given lmax for specified Euler angles or a Rotation object.

Parameters:

Name Type Description Default
lmax int

The maximum degree of the Wigner-D matrices.

required
mprime_max int

The maximum order of m' for the Wigner-D matrices.

required
eulers_or_rotation tuple[float, float, float] | Rotation

The Euler angles (alpha, beta, gamma) specifying the rotation, or a Rotation object.

required

Returns:

Type Description
NDArray[complex128]

An array of shape (lmax+1, 2*mprime_max+1, 2*lmax+1) containing the Wigner-D matrices.

Source code in src/serval/rotate.py
def wigner_d(
    lmax: int,
    mprime_max: int,
    eulers_or_rotation: EulersType | Rotation,
) -> npt.NDArray[np.complex128]:
    r"""Generates Wigner-D matrices up to a given lmax for specified Euler angles or
    a Rotation object.

    Parameters
    ----------
    lmax : int
        The maximum degree of the Wigner-D matrices.
    mprime_max : int
        The maximum order of m' for the Wigner-D matrices.
    eulers_or_rotation : tuple[float, float, float] | Rotation
        The Euler angles (alpha, beta, gamma) specifying the rotation,
        or a Rotation object.

    Returns
    -------
    npt.NDArray[np.complex128]
        An array of shape ``(lmax+1, 2*mprime_max+1, 2*lmax+1)`` containing the
        Wigner-D matrices.
    """
    if isinstance(eulers_or_rotation, Rotation):
        eulers = eulers_from_rotation(eulers_or_rotation)
    else:
        eulers = eulers_or_rotation

    # One-hot-dot reconstruction of wigner matrix through per-mprime transforming.
    # To rotate, you contract on the first axis of the wigner-d matrix.
    # Allocate the final output shape directly to avoid large intermediates and
    # concatenation copies. Axes: (l, mprime, m) with mprime in [-mprime_max, mprime_max]
    # and m in [-lmax, lmax].
    out = empty_complex_array((lmax + 1, 2 * mprime_max + 1, 2 * lmax + 1))
    # Precomputed sign vector for the symmetry fill: (-1)^(mprime - m) for each m column.
    # D^l_{-m', m} = (-1)^{m'-m} * conj(D^l_{m', -m})
    m_vals = np.arange(-lmax, lmax + 1)
    one_hot = pysh.SHCoeffs.from_zeros(lmax=lmax, kind="complex", **SHT_CONVENTIONS)
    for mprime in range(0, mprime_max + 1):
        one_hot.coeffs[:] = 0.0
        one_hot.coeffs[0, mprime:, mprime] = 1.0
        rotted = one_hot.rotate(*eulers, **ROTATE_CONVENTIONS)
        # rotted.coeffs shape: (2, lmax+1, lmax+1) = (sign_m, l, |m|)
        # Fill positive-m columns (m=0..lmax → output columns lmax..2*lmax)
        out[:, mprime_max + mprime, lmax:] = rotted.coeffs[0]
        # Fill negative-m columns (m=-lmax..-1 → output columns 0..lmax-1)
        out[:, mprime_max + mprime, :lmax] = rotted.coeffs[1, :, lmax:0:-1]
        # Fill negative-mprime row via symmetry
        if mprime > 0:
            sign_factor = (-1.0) ** (mprime - m_vals)
            out[:, mprime_max - mprime, :] = sign_factor * out[:, mprime_max + mprime, ::-1].conj()

    return out

zenith_to_baseline_direc(east, north, up)

Creates a rotation object for basis rotations from the local Zenith frame to a frame aligned with the direction of the baseline vector.

Parameters:

Name Type Description Default
east float

The East component of the baseline vector.

required
north float

The North component of the baseline vector.

required
up float

The Up component of the baseline vector.

required

Returns:

Type Description
Rotation

A Rotation object representing the local Zenith to baseline direction frame basis rotation.

Source code in src/serval/rotate.py
def zenith_to_baseline_direc(east: float, north: float, up: float) -> Rotation:
    r"""Creates a rotation object for basis rotations from the local Zenith frame
    to a frame aligned with the direction of the baseline vector.

    Parameters
    ----------
    east : float
        The East component of the baseline vector.
    north : float
        The North component of the baseline vector.
    up : float
        The Up component of the baseline vector.
    Returns
    -------
    Rotation
        A Rotation object representing the local Zenith to baseline direction
        frame basis rotation.
    """
    return rotation_from_eulers(
        zenith_to_baseline_direc_eulers(east=east, north=north, up=up),
    )

zenith_to_baseline_direc_eulers(east, north, up)

Returns the Euler angles for basis rotations from the local Zenith frame to a frame aligned with the direction of the baseline vector.

The convention used here and throughout is that of ZYZ passive (basis) rotations.

Parameters:

Name Type Description Default
east float

The East component of the baseline vector.

required
north float

The North component of the baseline vector.

required
up float

The Up component of the baseline vector.

required

Returns:

Type Description
tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the local Zenith to baseline direction frame basis rotation.

Source code in src/serval/rotate.py
def zenith_to_baseline_direc_eulers(east: float, north: float, up: float) -> EulersType:
    r"""Returns the Euler angles for basis rotations from the local Zenith frame
    to a frame aligned with the direction of the baseline vector.

    The convention used here and throughout is that of
    ZYZ passive (basis) rotations.

    Parameters
    ----------
    east : float
        The East component of the baseline vector.
    north : float
        The North component of the baseline vector.
    up : float
        The Up component of the baseline vector.
    Returns
    -------
    tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the local Zenith to baseline direction
        frame basis rotation.
    """
    norm = np.linalg.norm([east, north, up])
    if norm == 0.0:
        return (0.0, 0.0, 0.0)
    else:
        return (
            -np.arctan2(east / norm, north / norm),
            np.arcsin(up / norm) - np.pi / 2,
            0.0,
        )

zenith_to_pointing(altitude=np.pi / 2, azimuth=np.pi, boresight=0.0)

Creates a rotation object from the local Zenith frame to the to a frame aligned with the telescope pointing direction.

Parameters:

Name Type Description Default
altitude float

The altitude angle in radians , by default np.pi / 2.

pi / 2
azimuth float

The azimuth angle in radians, by default np.pi.

pi
boresight float

The boresight angle in radians, by default 0.

0.0

Returns:

Type Description
Rotation

A Rotation object representing the local Zenith to telescope pointing frame basis rotation.

Source code in src/serval/rotate.py
def zenith_to_pointing(
    altitude: float = np.pi / 2,
    azimuth: float = np.pi,
    boresight: float = 0.0,
) -> Rotation:
    r"""Creates a rotation object from the local Zenith frame to the to a frame aligned with
    the telescope pointing direction.
    Parameters
    ----------
    altitude : float, optional
        The altitude angle in radians , by default np.pi / 2.
    azimuth : float, optional
        The azimuth angle in radians, by default np.pi.
    boresight : float, optional
        The boresight angle in radians, by default 0.
    Returns
    -------
    Rotation
        A Rotation object representing the local Zenith to telescope pointing frame basis rotation.
    """
    return rotation_from_eulers(
        zenith_to_pointing_eulers(altitude=altitude, azimuth=azimuth, boresight=boresight),
    )

zenith_to_pointing_eulers(altitude=None, azimuth=None, boresight=None)

Returns the Euler angles for basis rotations from the local Zenith frame to a frame aligned with the telescope pointing direction.

The convention used here and throughout is that of ZYZ passive (basis) rotations.

Parameters:

Name Type Description Default
altitude float

The altitude angle in radians, by default np.pi / 2.

None
azimuth float

The azimuth angle in radians, by default np.pi.

None
boresight float

The boresight angle in radians, by default 0.0.

None

Returns:

Type Description
tuple[float, float, float]

The Euler angles (alpha, beta, gamma) specifying the local Zenith to pointing frame basis rotation.

Source code in src/serval/rotate.py
def zenith_to_pointing_eulers(
    altitude: float | None = None,
    azimuth: float | None = None,
    boresight: float | None = None,
) -> EulersType:
    r"""Returns the Euler angles for basis rotations from the local Zenith frame
    to a frame aligned with the telescope pointing direction.

    The convention used here and throughout is that of
    ZYZ passive (basis) rotations.

    Parameters
    ----------
    altitude : float, optional
        The altitude angle in radians, by default np.pi / 2.
    azimuth : float, optional
        The azimuth angle in radians, by default np.pi.
    boresight : float, optional
        The boresight angle in radians, by default 0.0.
    Returns
    -------
    tuple[float, float, float]
        The Euler angles (alpha, beta, gamma) specifying the local Zenith to pointing
        frame basis rotation.
    """
    altitude = np.pi / 2 if altitude is None else altitude  # Default
    azimuth = np.pi if azimuth is None else azimuth  # Default
    boresight = 0.0 if boresight is None else boresight  # Default
    return (-azimuth, altitude - np.pi / 2, boresight)

Spherical Harmonic Transform Utilities (serval.sht.py)

alm_from_healpix(healpix_alm, return_sparse=True)

alm_from_healpix(
    healpix_alm: npt.NDArray[np.complex128],
    return_sparse: Literal[True] = ...,
) -> sparse.COO
alm_from_healpix(
    healpix_alm: npt.NDArray[np.complex128],
    return_sparse: Literal[False] = ...,
) -> npt.NDArray[np.complex128]

Transforms spherical harmonics coefficients from healpy format (1D) to standard Fourier order format (2D). It can return either a sparse or dense array.

Parameters:

Name Type Description Default
healpix_alm NDArray[complex128]

A spherical harmonics coefficients array in healpy format (1D).

required
return_sparse bool

Whether to return a sparse array or a dense one.

True

Returns:

Type Description
COO | NDArray[complex128]

A spherical harmonics coefficients array in standard Fourier order (2D), in sparse or dense form.

Source code in src/serval/sht.py
def alm_from_healpix(
    healpix_alm: npt.NDArray[np.complex128], return_sparse: bool = True
) -> sparse.COO | npt.NDArray[np.complex128]:
    r"""Transforms spherical harmonics coefficients from healpy format (1D)
    to standard Fourier order format (2D).
    It can return either a sparse or dense array.
    Parameters
    ----------
    healpix_alm : npt.NDArray[np.complex128]
        A spherical harmonics coefficients array in healpy format (1D).
    return_sparse : bool, optional
        Whether to return a sparse array or a dense one.
    Returns
    -------
    sparse.COO | npt.NDArray[np.complex128]
        A spherical harmonics coefficients array in standard Fourier order (2D),
        in sparse or dense form.
    """

    lmax = get_lmax_from_1D_alm(healpix_alm.size)
    ell, m = get_alm_ls_ms(lmax)
    # healpy includes the Condon-Shortley phase (-1)^m in Y_lm for m > 0,
    # but pyshtools with csphase=1 excludes it.  Apply the sign correction
    # so the stored alm are in pyshtools convention.
    cs_sign = (-1) ** m
    sparse_alm = sparse.COO([ell, m + lmax], cs_sign * healpix_alm, shape=(lmax + 1, 2 * lmax + 1))
    # Fill in the redundant negative-m part using pyshtools reality condition:
    # a_{l,-m} = (-1)^m * conj(a_{l,m})  →  conj(healpix_alm[m>0]) (no extra sign).
    sparse_alm += sparse.COO(
        [ell[m > 0], lmax - m[m > 0]],
        healpix_alm[m > 0].conj(),
        shape=(lmax + 1, 2 * lmax + 1),
    )
    if return_sparse:
        return sparse_alm
    else:
        return sparse_alm.todense()

analysis(grid, threshold=SPARSE_THRESHOLD, conjugate=False, return_sparse=True)

analysis(
    grid: pysh.SHGrid,
    threshold: float = ...,
    conjugate: bool = ...,
    return_sparse: Literal[True] = ...,
) -> sparse.COO
analysis(
    grid: pysh.SHGrid,
    threshold: float = ...,
    conjugate: bool = ...,
    return_sparse: Literal[False] = ...,
) -> npt.NDArray[np.complex128]

Transforms a pyshtools grid to the corresponding spherical harmonics coefficients or their conjugates in standard Fourier order. It can return either a sparse or dense array.

Parameters:

Name Type Description Default
grid SHGrid

A spherical harmonic grid object.

required
threshold float

The threshold below which coefficients are set to zero.

SPARSE_THRESHOLD
conjugate bool

Whether to return the conjugate spherical harmonics coefficients.

False
return_sparse bool

Whether to return a sparse array or a dense one.

True

Returns:

Type Description
COO | NDArray[complex128]

The spherical harmonics coefficients array or their conjugates in standard Fourier order, in sparse or dense form.

Source code in src/serval/sht.py
def analysis(
    grid: pysh.SHGrid,
    threshold: float = SPARSE_THRESHOLD,
    conjugate: bool = False,
    return_sparse: bool = True,
) -> sparse.COO | npt.NDArray[np.complex128]:
    r"""Transforms a pyshtools grid to the corresponding spherical harmonics coefficients or their
    conjugates in standard Fourier order. It can return either a sparse or dense array.
    Parameters
    ----------
    grid : pysh.SHGrid
        A spherical harmonic grid object.
    threshold : float, optional
        The threshold below which coefficients are set to zero.
    conjugate : bool, optional
        Whether to return the conjugate spherical harmonics coefficients.
    return_sparse : bool, optional
        Whether to return a sparse array or a dense one.
    Returns
    -------
    sparse.COO | npt.NDArray[np.complex128]
        The spherical harmonics coefficients array or their conjugates in standard Fourier order,
        in sparse or dense form.
    """
    sparse_alm = to_sparse(grid.expand(**SHT_CONVENTIONS), threshold=threshold)
    if conjugate:
        out = make_conjugate(sparse_alm)
    else:
        out = sparse_alm
    if return_sparse:
        return out
    else:
        assert isinstance(out, sparse.COO)
        return out.todense()

array_synthesis(array_coeff)

Transfomrs spherical harmonics coefficients from a pyshtools array to a pyshtools grid.

Parameters:

Name Type Description Default
array_coeff NDArray[complex128]

A spherical harmonics coefficients array in pytshtools format. Its shape should be (lmax + 1, 2 * mmax + 1).

required

Returns:

Type Description
SHGrid

A spherical harmonic grid object.

Source code in src/serval/sht.py
def array_synthesis(array_coeff: npt.NDArray[np.complex128]) -> pysh.SHGrid:
    r"""Transfomrs spherical harmonics coefficients from a pyshtools array to a pyshtools grid.
    Parameters
    ----------
    array_coeff : npt.NDArray[np.complex128]
        A spherical harmonics coefficients array in pytshtools format. Its shape
        should be (lmax + 1, 2 * mmax + 1).
    Returns
    -------
    pysh.SHGrid
        A spherical harmonic grid object.
    """
    return pysh.SHCoeffs.from_array(unstack_ms(array_coeff), **SHT_CONVENTIONS).expand(
        grid=GRID_CONVENTIONS["grid"], extend=GRID_CONVENTIONS["extend"]
    )

batch_array_analysis(map_batch, lmax)

Batch-analyze complex DH grids to SERVAL alm arrays via pyshtools.

Parameters:

Name Type Description Default
map_batch NDArray[complex128]

Complex grid data of shape (..., ntheta, nphi_ring) where ntheta = nphi_ring = 2*(lmax+1).

required
lmax int

Maximum spherical harmonic degree.

required

Returns:

Type Description
NDArray[complex128]

SERVAL alm array of shape (..., lmax+1, 2*lmax+1).

Source code in src/serval/sht.py
def batch_array_analysis(
    map_batch: npt.NDArray[np.complex128],
    lmax: int,
) -> npt.NDArray[np.complex128]:
    r"""Batch-analyze complex DH grids to SERVAL alm arrays via pyshtools.

    Parameters
    ----------
    map_batch : npt.NDArray[np.complex128]
        Complex grid data of shape ``(..., ntheta, nphi_ring)`` where
        ``ntheta = nphi_ring = 2*(lmax+1)``.
    lmax : int
        Maximum spherical harmonic degree.

    Returns
    -------
    npt.NDArray[np.complex128]
        SERVAL alm array of shape ``(..., lmax+1, 2*lmax+1)``.
    """
    batch_shape = map_batch.shape[:-2]
    n = int(np.prod(batch_shape))
    maps_flat = map_batch.reshape(n, *map_batch.shape[-2:])
    template = grid_template(lmax)
    alms = np.empty((n, lmax + 1, 2 * lmax + 1), dtype=np.complex128)
    for i in range(n):
        g = template.copy()
        g.data = maps_flat[i]
        alms[i] = analysis(g, return_sparse=False)
    return alms.reshape(*batch_shape, lmax + 1, 2 * lmax + 1)

batch_array_synthesis(alm_batch, lmax, nthreads=0)

Batch-synthesize complex SH alm arrays to complex DH grids via ducc0.

Uses ducc0.sht.experimental.synthesis with ntrans=2n (real and imaginary components stacked) to replace n separate pyshtools synthesis calls.

Parameters:

Name Type Description Default
alm_batch NDArray[complex128]

SERVAL alm array of shape (..., lmax+1, 2*lmax+1), already band-limited to lmax in both degree and order.

required
lmax int

Maximum spherical harmonic degree.

required
nthreads int

Number of threads for ducc0 (0 = all hardware threads).

0

Returns:

Type Description
NDArray[complex128]

Complex grid data of shape (..., ntheta, nphi_ring) where ntheta = nphi_ring = 2*(lmax+1).

Source code in src/serval/sht.py
def batch_array_synthesis(
    alm_batch: npt.NDArray[np.complex128],
    lmax: int,
    nthreads: int = 0,
) -> npt.NDArray[np.complex128]:
    r"""Batch-synthesize complex SH alm arrays to complex DH grids via ducc0.

    Uses ``ducc0.sht.experimental.synthesis`` with ``ntrans=2n`` (real and imaginary
    components stacked) to replace ``n`` separate pyshtools synthesis calls.

    Parameters
    ----------
    alm_batch : npt.NDArray[np.complex128]
        SERVAL alm array of shape ``(..., lmax+1, 2*lmax+1)``, already band-limited
        to ``lmax`` in both degree and order.
    lmax : int
        Maximum spherical harmonic degree.
    nthreads : int
        Number of threads for ducc0 (0 = all hardware threads).

    Returns
    -------
    npt.NDArray[np.complex128]
        Complex grid data of shape ``(..., ntheta, nphi_ring)`` where
        ``ntheta = nphi_ring = 2*(lmax+1)``.
    """
    batch_shape = alm_batch.shape[:-2]
    n = int(np.prod(batch_shape))
    alm_flat = alm_batch.reshape(n, *alm_batch.shape[-2:])           # (n, lmax+1, 2*lmax+1)
    theta, nphi, phi0, ringstart, ntheta, nphi_ring = _dh_grid_params(lmax)
    aR, aI = serval_to_ducc0_alm(alm_flat, lmax, lmax)              # (n, ncoeff) each
    alm_stack = np.concatenate([aR, aI], axis=0)[:, np.newaxis, :]  # (2n, 1, ncoeff)
    maps = ducc0.sht.experimental.synthesis(
        alm=alm_stack,
        theta=theta,
        nphi=nphi,
        phi0=phi0,
        ringstart=ringstart,
        lmax=lmax,
        spin=0,
        nthreads=nthreads,
    )  # (2n, 1, ntheta*nphi_ring)
    maps = maps[:, 0, :].reshape(2 * n, ntheta, nphi_ring)
    return (maps[:n] + 1j * maps[n:]).reshape(*batch_shape, ntheta, nphi_ring)

broadcast_bandlimits(*coeffs, lmax=None)

Broadcasts a set of spherical harmonics coefficients to common bandlimits. These are either given or inferred from the maximum lmax among the inputs.

Parameters:

Name Type Description Default
*coeffs Coeffs

A variable set of sparse or dense spherical harmonics coefficients arrays in standard Fourier order. The shape or array i in the set should be (lmax_i + 1, 2 * mmax_i + 1) for each i.

()
lmax int | None

The target maximum spherical harmonic degree. If None, it is inferred from the maximum lmax among the input coefficients.

None

Returns:

Type Description
tuple[Coeffs, ...]

A tuple containing the input spherical harmonics coefficients arrays reshaped to the common target bandlimits.

Source code in src/serval/sht.py
def broadcast_bandlimits(*coeffs: Coeffs, lmax: int | None = None) -> tuple[Coeffs, ...]:
    r"""Broadcasts a set of spherical harmonics coefficients to common bandlimits. These are either
    given or inferred from the maximum lmax among the inputs.
    Parameters
    ----------
    *coeffs : Coeffs
        A variable set of sparse or dense spherical harmonics coefficients arrays
        in standard Fourier order. The shape or array i in the set should be
        (lmax_i + 1, 2 * mmax_i + 1) for each i.
    lmax : int | None, optional
        The target maximum spherical harmonic degree. If None, it is inferred from the maximum
        lmax among the input coefficients.
    Returns
    -------
    tuple[Coeffs, ...]
        A tuple containing the input spherical harmonics coefficients arrays reshaped to the
        common target bandlimits.
    """
    if lmax is None:
        target_lmax = max([coeff.shape[0] - 1 for coeff in coeffs])
    else:
        target_lmax = lmax
    return tuple(set_bandlimits(coeff, lmax=target_lmax) for coeff in coeffs)

compute_mmodes(coeffs1, coeffs2)

Computes the mmodes between two spherical harmonics coefficients sparse or dense arrays. The relative formula is given by:

\[ \mathrm{mmodes}[m] = \sum_{\ell=0}^{l_{\max}} \mathrm{coeffs1}[\ell, m]\, \mathrm{coeffs2}[\ell, m] \]

Parameters:

Name Type Description Default
coeffs1 Coeffs

A sparse or dense spherical harmonics coefficients array in standard Fourier order. Its shape should be (lmax1 + 1, 2 * mmax1 + 1).

required
coeffs2 Coeffs

A sparse or dense spherical harmonics coefficients array in standard Fourier order. Its shape should be (lmax2 + 1, 2 * mmax2 + 1).

required

Returns:

Type Description
Coeffs

A sparse or dense spherical harmonics mmodes array resulting from the inner product of the input coefficients. Its shape will be (2 * max(mmax1, mmax2) + 1,).

Source code in src/serval/sht.py
def compute_mmodes(
    coeffs1: Coeffs,
    coeffs2: Coeffs,
) -> Coeffs:
    r"""Computes the mmodes between two spherical harmonics coefficients sparse or dense arrays.
        The relative formula is given by:

    $$
    \mathrm{mmodes}[m] = \sum_{\ell=0}^{l_{\max}}
        \mathrm{coeffs1}[\ell, m]\,
        \mathrm{coeffs2}[\ell, m]
    $$

    Parameters
    ----------
    coeffs1 : Coeffs
        A sparse or dense spherical harmonics coefficients array in standard Fourier order.
        Its shape should be (lmax1 + 1, 2 * mmax1 + 1).
    coeffs2 : Coeffs
        A sparse or dense spherical harmonics coefficients array in standard Fourier order.
        Its shape should be (lmax2 + 1, 2 * mmax2 + 1).
    Returns
    -------
    Coeffs
        A sparse or dense spherical harmonics mmodes array resulting from the inner product
        of the input coefficients. Its shape will be (2 * max(mmax1, mmax2) + 1,).
    """
    xp = array_namespace(coeffs1, coeffs2)
    expanded_coeffs1, expanded_coeffs2 = broadcast_bandlimits(coeffs1, coeffs2)
    mmodes = xp.sum(expanded_coeffs1 * expanded_coeffs2, axis=0)
    return mmodes

ducc0_alm2map(alms, kwargs)

Calculate the maps for the provided alms.

Parameters:

Name Type Description Default
alms NDArray[complex128]

Collection of 1D alms.

required
kwargs dict

Dictionary of arguments for the ducc0 function.

required

Returns:

Type Description
npt.NDArray[np.float64]

The corresponding 1D maps for the alms.

Source code in src/serval/sht.py
def ducc0_alm2map(alms: npt.NDArray[np.complex128], kwargs: dict) -> npt.NDArray[np.float64]:
    r"""Calculate the maps for the provided alms.

    Parameters
    ----------
    alms : pt.NDArray[np.complex128]
        Collection of 1D alms.
    kwargs : dict
        Dictionary of arguments for the ducc0 function.

    Returns
    -------
     npt.NDArray[np.float64]
        The corresponding 1D maps for the alms.
    """
    if alms.ndim != 2:
        raise ValueError("The dimensions of the alms are wrong.")

    return ducc0.sht.synthesis(alm=alms, **kwargs)

ducc0_map2alm(maps, kwargs)

Calculate the Spherical Harmonic coefficients alm for the provided maps.

Parameters:

Name Type Description Default
maps NDArray[float64]

Collection of 1D Healpix maps of size npix = 12 * nside ** 2.

required
kwargs dict

Dictionary of arguments for the ducc0 function.

required

Returns:

Type Description
NDArray[complex128]

The corresponding 1D alms for the maps.

Source code in src/serval/sht.py
def ducc0_map2alm(maps: npt.NDArray[np.float64], kwargs: dict) -> npt.NDArray[np.complex128]:
    r"""Calculate the Spherical Harmonic coefficients alm for the provided maps.

    Parameters
    ----------
    maps : npt.NDArray[np.float64]
        Collection of 1D Healpix maps of size npix = 12 * nside ** 2.
    kwargs : dict
        Dictionary of arguments for the ducc0 function.

    Returns
    -------
    npt.NDArray[np.complex128]
        The corresponding 1D alms for the maps.
    """
    if maps.ndim != 2:
        raise ValueError("The dimensions of the maps are wrong.")

    return ducc0.sht.adjoint_synthesis(map=maps, **kwargs)

ducc0_to_serval_alm(aR, aI, lmax)

Convert the (aR, aI) pair from ducc0.sht.rotate_alm back to SERVAL format.

This is the inverse of :func:serval_to_ducc0_alm. The output always has mmax = lmax because a rotation generically populates all m-modes.

The reconstruction follows:

\[c_{\ell,m} = (-1)^m (a_{R,\ell m} + i\,a_{I,\ell m}) \quad (m > 0)\]
\[c_{\ell,-m} = \mathrm{Re}(a_{R,\ell m}) + \mathrm{Im}(a_{I,\ell m}) + i\bigl(\mathrm{Re}(a_{I,\ell m}) - \mathrm{Im}(a_{R,\ell m})\bigr) \quad (m > 0)\]
\[c_{\ell,0} = \mathrm{Re}(a_{R,\ell 0}) + i\,\mathrm{Re}(a_{I,\ell 0})\]

Parameters:

Name Type Description Default
aR NDArray[complex128]

Arrays of shape (..., ncoeff) where ncoeff = (lmax+1)*(lmax+2)//2, as returned by ducc0.sht.rotate_alm.

required
aI NDArray[complex128]

Arrays of shape (..., ncoeff) where ncoeff = (lmax+1)*(lmax+2)//2, as returned by ducc0.sht.rotate_alm.

required
lmax int

Maximum degree. Output mmax equals lmax.

required

Returns:

Type Description
NDArray[complex128]

SERVAL alm of shape (..., lmax+1, 2*lmax+1).

Source code in src/serval/sht.py
def ducc0_to_serval_alm(
    aR: npt.NDArray[np.complex128],
    aI: npt.NDArray[np.complex128],
    lmax: int,
) -> npt.NDArray[np.complex128]:
    r"""Convert the ``(aR, aI)`` pair from ``ducc0.sht.rotate_alm`` back to SERVAL format.

    This is the inverse of :func:`serval_to_ducc0_alm`.  The output always has
    ``mmax = lmax`` because a rotation generically populates all m-modes.

    The reconstruction follows:

    $$c_{\ell,m} = (-1)^m (a_{R,\ell m} + i\,a_{I,\ell m}) \quad (m > 0)$$

    $$c_{\ell,-m} = \mathrm{Re}(a_{R,\ell m}) + \mathrm{Im}(a_{I,\ell m})
                   + i\bigl(\mathrm{Re}(a_{I,\ell m}) - \mathrm{Im}(a_{R,\ell m})\bigr)
                   \quad (m > 0)$$

    $$c_{\ell,0} = \mathrm{Re}(a_{R,\ell 0}) + i\,\mathrm{Re}(a_{I,\ell 0})$$

    Parameters
    ----------
    aR, aI : npt.NDArray[np.complex128]
        Arrays of shape ``(..., ncoeff)`` where ``ncoeff = (lmax+1)*(lmax+2)//2``,
        as returned by ``ducc0.sht.rotate_alm``.
    lmax : int
        Maximum degree.  Output ``mmax`` equals ``lmax``.

    Returns
    -------
    npt.NDArray[np.complex128]
        SERVAL alm of shape ``(..., lmax+1, 2*lmax+1)``.
    """
    ls_arr, ms_arr, m0_mask, mpos_mask = _ducc0_triangular_indices(lmax)
    sign_mpos = (-1.0) ** ms_arr[mpos_mask]

    batch_shape = aR.shape[:-1]
    out = empty_complex_array((*batch_shape, lmax + 1, 2 * lmax + 1))

    # m=0
    out[..., ls_arr[m0_mask], lmax] = aR[..., m0_mask].real + 1j * aI[..., m0_mask].real

    # m>0: positive-m columns
    out[..., ls_arr[mpos_mask], lmax + ms_arr[mpos_mask]] = sign_mpos * (
        aR[..., mpos_mask] + 1j * aI[..., mpos_mask]
    )

    # m>0: negative-m columns
    out[..., ls_arr[mpos_mask], lmax - ms_arr[mpos_mask]] = (
        aR[..., mpos_mask].real + aI[..., mpos_mask].imag
    ) + 1j * (aI[..., mpos_mask].real - aR[..., mpos_mask].imag)

    return out

empty_complex_array(shape)

Generates a zero numpy complex array for the given shape.

Args: shape (tuple): dimensions of the array.

Returns: npt.NDArray[np.complex128]: zero numpy array

Source code in src/serval/sht.py
def empty_complex_array(shape: tuple) -> npt.NDArray[np.complex128]:
    """Generates a zero numpy complex array for the given shape.

    Args:
        shape (tuple): dimensions of the array.

    Returns:
        npt.NDArray[np.complex128]: zero numpy array
    """
    return np.zeros(shape, dtype=np.complex128)

from_sparse(sparse_coeffs, lmax=None)

Transfomrs spherical harmonics coefficients from a sparse array in standard Fourier order to a pyshtools SHCoeffs object of given bandlimits.

Parameters:

Name Type Description Default
sparse_coeffs COO

A sparse spherical harmonics coefficients array in standard Fourier order. Its shape should be (lmax_sparse + 1, 2 * mmax_sparse + 1).

required
lmax int | None

The target maximum spherical harmonic degree. If None, it is inferred from the input sparse coefficients lmax_sparse.

None

Returns:

Type Description
SHCoeffs

A spherical harmonics coefficients object in pyshtools format. Its shape will be (2, lmax + 1, mmax + 1).

Source code in src/serval/sht.py
def from_sparse(sparse_coeffs: sparse.COO, lmax: int | None = None) -> pysh.SHCoeffs:
    r"""Transfomrs spherical harmonics coefficients from a sparse array in standard Fourier order
    to a pyshtools SHCoeffs object of given bandlimits.
    Parameters
    ----------
    sparse_coeffs : sparse.COO
        A sparse spherical harmonics coefficients array in standard Fourier order.
        Its shape should be (lmax_sparse + 1, 2 * mmax_sparse + 1).
    lmax : int | None, optional
        The target maximum spherical harmonic degree. If None, it is inferred from the input
        sparse coefficients lmax_sparse.
    Returns
    -------
    pysh.SHCoeffs
        A spherical harmonics coefficients object in pyshtools format. Its shape
        will be (2, lmax + 1, mmax + 1).
    """
    if lmax is not None:
        coeff = set_bandlimits(sparse_coeffs, lmax=lmax)
    else:
        lmax = sparse_coeffs.shape[0] - 1
        coeff = set_bandlimits(sparse_coeffs, lmax=lmax)
    return pysh.SHCoeffs.from_array(unstack_ms(coeff.todense()), **SHT_CONVENTIONS)

get_alm_ls_ms(lmax)

Get the l and m pairs for specific lmax. It is assumed mmax = lmax.

Parameters:

Name Type Description Default
lmax int

The maximum l defining the alm layout

required

Returns:

Name Type Description
ls NDArray[int]

The ls corresponding to all the ms.

ms NDArray[int]

The ms for mmax = lmax.

Source code in src/serval/sht.py
def get_alm_ls_ms(lmax) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.int64]]:
    r"""Get the l and m pairs for specific lmax.
    It is assumed mmax = lmax.

    Parameters
    ----------
    lmax : int
      The maximum l defining the alm layout

    Returns
    -------
    ls : npt.NDArray[int]
      The ls corresponding to all the ms.
    ms : npt.NDArray[int]
      The ms for mmax = lmax.
    """
    if lmax is None:
        raise TypeError("lmax is None.")
    if lmax < 0:
        raise ValueError("lmax is a negative number.")

    size = (lmax + 1) * (lmax + 2) // 2
    ls = np.zeros(size, dtype=np.int64)
    ms = np.zeros(size, dtype=np.int64)

    idx = 0
    for m in range(lmax + 1):
        for ell in range(m, lmax + 1):
            ls[idx] = ell
            ms[idx] = m
            idx += 1

    return (ls, ms)

get_lmax_from_1D_alm(size)

Returns the lmax for a given spherical harmonics coefficients array in healpy format (1D). It is assumed mmax = lmax.

Parameters:

Name Type Description Default
size int

Size of the array

required

Returns:

Name Type Description
lmax int

The maximum l of the array, or None if it is not a valid size.

Source code in src/serval/sht.py
def get_lmax_from_1D_alm(size: int) -> int | None:
    r"""Returns the lmax for a given spherical harmonics coefficients array
    in healpy format (1D).
    It is assumed mmax = lmax.

    Parameters
    ----------
    size : int
      Size of the array

    Returns
    -------
    lmax : int
      The maximum l of the array, or None if it is not a valid size.
    """
    root = math.isqrt(1 + 8 * size)
    lmax = (root - 3) // 2
    if (lmax + 1) * (lmax + 2) // 2 != size:
        return None
    return lmax

grid_template(lmax)

Creates a spherical harmonic grid object of degree lmax filled with complex zeros.

Parameters:

Name Type Description Default
lmax int

The maximum spherical harmonic degree of the grid.

required

Returns:

Type Description
SHGrid

A spherical harmonic grid object with data set to (complex) zeros.

Source code in src/serval/sht.py
def grid_template(lmax: int) -> pysh.SHGrid:
    r"""Creates a spherical harmonic grid object of degree lmax filled with complex zeros.
    Parameters
    ----------
    lmax : int
        The maximum spherical harmonic degree of the grid.
    Returns
    -------
    pysh.SHGrid
        A spherical harmonic grid object with data set to (complex) zeros.
    """
    return pysh.SHGrid.from_zeros(lmax=lmax, kind="complex", **GRID_CONVENTIONS)

healpix_from_alm(serval_alm)

Transforms spherical harmonics coefficients from standard Fourier order format (2D) to healpy format (1D). This is the inverse of :func:alm_from_healpix.

Parameters:

Name Type Description Default
serval_alm NDArray[complex128]

A spherical harmonics coefficients array in standard Fourier order (2D), shape (lmax + 1, 2 * lmax + 1).

required

Returns:

Type Description
NDArray[complex128]

A spherical harmonics coefficients array in healpy format (1D), containing only the m >= 0 entries with the Condon-Shortley phase included.

Source code in src/serval/sht.py
def healpix_from_alm(
    serval_alm: npt.NDArray[np.complex128],
) -> npt.NDArray[np.complex128]:
    r"""Transforms spherical harmonics coefficients from standard Fourier order format (2D)
    to healpy format (1D).  This is the inverse of :func:`alm_from_healpix`.

    Parameters
    ----------
    serval_alm : npt.NDArray[np.complex128]
        A spherical harmonics coefficients array in standard Fourier order (2D),
        shape ``(lmax + 1, 2 * lmax + 1)``.

    Returns
    -------
    npt.NDArray[np.complex128]
        A spherical harmonics coefficients array in healpy format (1D), containing
        only the m >= 0 entries with the Condon-Shortley phase included.
    """

    lmax = serval_alm.shape[0] - 1
    ell, m = get_alm_ls_ms(lmax)
    # pyshtools with csphase=1 excludes (-1)^m; healpy includes it for m > 0.
    # Multiply by (-1)^m to restore the healpy convention.
    cs_sign = (-1) ** m
    return cs_sign * serval_alm[ell, m + lmax]

healpix_map_base(nside, lmax, spin=0, nthreads=0)

Create the base for the transformation of the 1D healpix map to spherical coordinates.

Parameters:

Name Type Description Default
nside int

The map size.

required
lmax int

The maximum degree of the alms.

required
spin int

The weights used for the map or alms. Default 0.

0
nthreads int

The number of threads for parallelization. Default 0 allows ducc0 to parallelize.

0

Returns:

Type Description
dict

Dictionary with all arguments for the ducc0 synthesis and adjoint_synthesis.

Source code in src/serval/sht.py
def healpix_map_base(nside: int, lmax: int, spin: int = 0, nthreads: int = 0) -> dict:
    r"""Create the base for the transformation of the 1D healpix map to spherical coordinates.

    Parameters
    ----------
    nside : int
        The map size.
    lmax : int
        The maximum degree of the alms.
    spin : int
        The weights used for the map or alms. Default 0.
    nthreads : int
        The number of threads for parallelization. Default 0 allows ducc0 to parallelize.

    Returns
    -------
    dict
        Dictionary with all arguments for the ducc0 synthesis and adjoint_synthesis.
    """
    healpix_base = ducc0.healpix.Healpix_Base(nside, "RING")
    info = healpix_base.sht_info()

    return dict(
        theta=info["theta"],
        nphi=info["nphi"],
        phi0=info["phi0"],
        ringstart=info["ringstart"],
        lmax=lmax,
        mmax=lmax,
        spin=spin,  # --> 0, no weights
        nthreads=nthreads,  # --> 0, let ducc0 decide how to parallelize
    )

healpix_map_to_alm(map1D, lmax, niter=3)

Calculate the Spherical Harmonics coeffeicients (alm) corresponding to the inputted 1D HEALPix map.

Parameters:

Name Type Description Default
map1D NDArray[float64]

1D Healpix map of size npix = 12 * nside ** 2.

required
lmax int

The maximum degree of the alms.

required
niter int

The number of iterations for the convergence to the solution. healpy uses default 3 so the default for this function is also set to 3.

3

Returns:

Name Type Description
alm NDArray[complex128]

A 1D array with the Spherical Harmonics coefficients derived from the map.

Source code in src/serval/sht.py
def healpix_map_to_alm(
    map1D: npt.NDArray[np.float64], lmax: int, niter: int = 3
) -> npt.NDArray[np.complex128]:
    r"""Calculate the Spherical Harmonics coeffeicients (alm) corresponding to the inputted
    1D HEALPix map.

    Parameters
    ----------
    map1D : npt.NDArray[np.float64]
        1D Healpix map of size npix = 12 * nside ** 2.
    lmax : int
        The maximum degree of the alms.
    niter: int
        The number of iterations for the convergence to the solution. healpy uses default 3 so
        the default for this function is also set to 3.

    Returns
    -------
    alm : npt.NDArray[np.complex128]
        A 1D array with the Spherical Harmonics coefficients derived from the map.
    """
    if map1D.ndim != 1:
        raise ValueError("The HEALPix map should be 1D.")
    if not isinstance(lmax, int):
        raise TypeError("lmax must be an integer.")
    if lmax < 0:
        raise ValueError("lmax should be non-negative.")

    npix = len(map1D)
    if npix % 12 != 0:
        raise ValueError("The size of the HEALPix map is not valid.")

    nside = math.isqrt(npix // 12)
    if 12 * nside**2 != npix:
        raise ValueError("the size of the HEALPix map is not valid.")

    kwargs = healpix_map_base(nside, lmax)

    map2D = map1D[None, :]
    normalization = 4.0 * math.pi / npix
    alm = ducc0_map2alm(map2D * normalization, kwargs)
    for _ in range(niter):
        alm -= ducc0_map2alm((ducc0_alm2map(alm, kwargs) - map2D) * normalization, kwargs)

    return alm[0]

make_conjugate(sparse_coeff, return_sparse=True)

make_conjugate(
    sparse_coeff: sparse.COO,
    return_sparse: Literal[True] = ...,
) -> sparse.COO
make_conjugate(
    sparse_coeff: sparse.COO,
    return_sparse: Literal[False] = ...,
) -> npt.NDArray[np.complex128]

Transforms spherical harmonics coefficients to their conjugate form. It can return either a sparse or dense array.

Parameters:

Name Type Description Default
sparse_coeff COO

A sparse spherical harmonics coefficients array in standard Fourier order. Its shape should be (lmax + 1, 2 * mmax + 1).

required
return_sparse bool

Whether to return a sparse array or a dense one.

True

Returns:

Type Description
COO | NDArray[complex128]

The conjugate spherical harmonics coefficients array in standard Fourier order, in sparse or dense form.

Source code in src/serval/sht.py
def make_conjugate(
    sparse_coeff: sparse.COO, return_sparse: bool = True
) -> sparse.COO | npt.NDArray[np.complex128]:
    r"""Transforms spherical harmonics coefficients to their conjugate form. It can return
    either a sparse or dense array.
    Parameters
    ----------
    sparse_coeff : sparse.COO
        A sparse spherical harmonics coefficients array in standard Fourier order.
        Its shape should be (lmax + 1, 2 * mmax + 1).
    return_sparse : bool, optional
        Whether to return a sparse array or a dense one.
    Returns
    -------
    sparse.COO | npt.NDArray[np.complex128]
        The conjugate spherical harmonics coefficients array in standard Fourier order,
        in sparse or dense form.
    """
    lmax = sparse_coeff.shape[0] - 1
    ms = np.arange(sparse_coeff.shape[1]) - lmax
    # This densifies always, could warn but should only be used in testing anyway
    out = (-1) ** np.abs(ms) * sparse_coeff.todense()[:, ::-1]
    if return_sparse:
        return sparse.COO.from_numpy(out)
    else:
        return out

ms_from_array(arr)

Creates a range of integers corresponding to the m values of spherical harmonics from -mmax to +mmax according to the order lmax of the input array.

Parameters:

Name Type Description Default
arr NDArray[complex128]

Any array of shape compatible with spherical harmonics coefficients where the first dimension corresponds to lmax + 1.

required

Returns:

Type Description
NDArray[int_]

An array of integers representing the m values from -mmax to +mmax.

Source code in src/serval/sht.py
def ms_from_array(arr: npt.NDArray[np.complex128]) -> npt.NDArray[np.int_]:
    r"""Creates a range of integers corresponding to the m values of spherical harmonics
    from -mmax to +mmax according to the order lmax of the input array.
    Parameters
    ----------
    arr : npt.NDArray[np.complex128]
        Any array of shape compatible with spherical harmonics coefficients where
        the first dimension corresponds to lmax + 1.
    Returns
    -------
    npt.NDArray[np.int_]
        An array of integers representing the m values from -mmax to +mmax.
    """
    # Gets ms from an mmode array (or anything that has the right shape for one)
    # Assumes no bandlimit in m
    return np.arange(arr.shape[0]) - (arr.shape[0] - 1) // 2

nonzero_bandlimits(coeff)

Computes the bandlimits lmax, mmax of spherical harmonics coefficients from a sparse array based on the non-zero entries.

Parameters:

Name Type Description Default
coeff COO | NDArray[complex128]

A spherical harmonics coefficients sparse array in standard Fourier order. Its shape should be (lmax + 1, 2 * mmax + 1).

required

Returns:

Type Description
tuple[int, int]

A tuple containing the maximum spherical harmonic degree lmax and maximum order mmax based on the non-zero coefficients.

Source code in src/serval/sht.py
def nonzero_bandlimits(
    coeff: sparse.COO | npt.NDArray[np.complex128],
) -> tuple[int, int]:
    r"""Computes the bandlimits lmax, mmax of spherical harmonics coefficients from a sparse
    array based on the non-zero entries.
    Parameters
    ----------
    coeff : sparse.COO | npt.NDArray[np.complex128]
        A spherical harmonics coefficients sparse array in standard Fourier order. Its shape
        should be (lmax + 1, 2 * mmax + 1).
    Returns
    -------
    tuple[int, int]
        A tuple containing the maximum spherical harmonic degree lmax and
        maximum order mmax based on the non-zero coefficients.
    """
    nonzero = coeff.nonzero()
    ells = nonzero[0]
    ms = nonzero[1] - (coeff.shape[1] - 1) // 2
    return int(ells.max()), int(np.abs(ms).max())

serval_to_ducc0_alm(alm, lmax, mmax)

Convert SERVAL alm to the (aR, aI) pair expected by ducc0.sht.rotate_alm.

SERVAL stores complex SH coefficients in a dense (..., lmax+1, 2*mmax+1) array indexed as alm[..., l, m + mmax]. ducc0.sht.rotate_alm internally decomposes a complex field into two real fields whose m >= 0 triangular coefficients are stored flat. This function performs that decomposition and the associated normalization conversion (pyshtools ortho + csphase=1 → ducc0 internal 4π norm).

The decomposition follows the relation used by pyshtools' ducc0 backend:

\[a_{R,\ell m} = \frac{(-1)^m\,c_{\ell,m} + c_{\ell,-m}^*}{2}, \qquad a_{I,\ell m} = \frac{i(c_{\ell,-m}^* - (-1)^m\,c_{\ell,m})}{2} \quad (m > 0)\]

with \(a_{R,\ell 0} = \mathrm{Re}(c_{\ell,0})\), \(a_{I,\ell 0} = \mathrm{Im}(c_{\ell,0})\). The normalization factors cancel between the ortho→4π conversion and the real-field reconstruction, yielding the compact expressions above.

Parameters:

Name Type Description Default
alm NDArray[complex128]

SERVAL alm array of shape (..., lmax+1, 2*mmax+1).

required
lmax int

Maximum degree.

required
mmax int

Maximum order stored in alm (may be less than lmax).

required

Returns:

Type Description
aR, aI : npt.NDArray[np.complex128]

Both have shape (..., ncoeff) where ncoeff = (lmax+1)*(lmax+2)//2. Entries for m > mmax are zero. Pass directly to ducc0.sht.rotate_alm.

Source code in src/serval/sht.py
def serval_to_ducc0_alm(
    alm: npt.NDArray[np.complex128],
    lmax: int,
    mmax: int,
) -> tuple[npt.NDArray[np.complex128], npt.NDArray[np.complex128]]:
    r"""Convert SERVAL alm to the ``(aR, aI)`` pair expected by ``ducc0.sht.rotate_alm``.

    SERVAL stores complex SH coefficients in a dense ``(..., lmax+1, 2*mmax+1)`` array
    indexed as ``alm[..., l, m + mmax]``.  ``ducc0.sht.rotate_alm`` internally
    decomposes a complex field into two real fields whose ``m >= 0`` triangular
    coefficients are stored flat.  This function performs that decomposition and
    the associated normalization conversion (pyshtools ortho + csphase=1  →  ducc0
    internal 4Ï€ norm).

    The decomposition follows the relation used by pyshtools' ducc0 backend:

    $$a_{R,\ell m} = \frac{(-1)^m\,c_{\ell,m} + c_{\ell,-m}^*}{2}, \qquad
    a_{I,\ell m} = \frac{i(c_{\ell,-m}^* - (-1)^m\,c_{\ell,m})}{2} \quad (m > 0)$$

    with $a_{R,\ell 0} = \mathrm{Re}(c_{\ell,0})$, $a_{I,\ell 0} = \mathrm{Im}(c_{\ell,0})$.
    The normalization factors cancel between the ortho→4π conversion and the
    real-field reconstruction, yielding the compact expressions above.

    Parameters
    ----------
    alm : npt.NDArray[np.complex128]
        SERVAL alm array of shape ``(..., lmax+1, 2*mmax+1)``.
    lmax : int
        Maximum degree.
    mmax : int
        Maximum order stored in *alm* (may be less than *lmax*).

    Returns
    -------
    aR, aI : npt.NDArray[np.complex128]
        Both have shape ``(..., ncoeff)`` where ``ncoeff = (lmax+1)*(lmax+2)//2``.
        Entries for ``m > mmax`` are zero.  Pass directly to ``ducc0.sht.rotate_alm``.
    """
    ls_arr, ms_arr, m0_mask, mpos_mask = _ducc0_triangular_indices(lmax)
    ncoeff = len(ms_arr)
    batch_shape = alm.shape[:-2]
    aR = empty_complex_array((*batch_shape, ncoeff))
    aI = empty_complex_array((*batch_shape, ncoeff))

    # m=0: ortho→4π and real-field norms cancel, leaving real/imag parts directly
    aR[..., m0_mask] = alm[..., ls_arr[m0_mask], mmax].real
    aI[..., m0_mask] = alm[..., ls_arr[m0_mask], mmax].imag

    # m>0 entries that are present in the SERVAL array
    fill_mpos = mpos_mask & (ms_arr <= mmax)
    if np.any(fill_mpos):
        ls_v = ls_arr[fill_mpos]
        ms_v = ms_arr[fill_mpos]
        sign_v = (-1.0) ** ms_v
        a_pos = alm[..., ls_v, mmax + ms_v]
        a_neg = alm[..., ls_v, mmax - ms_v]
        aR[..., fill_mpos] = (sign_v * a_pos + a_neg.conj()) / 2
        aI[..., fill_mpos] = 1j * (a_neg.conj() - sign_v * a_pos) / 2

    return aR, aI

set_bandlimits(coeffs, lmax, mmax=None)

Reshapes sparse or dense spherical harmonics coefficients to given bandlimits.

Operates on the last two axes of coeffs, treating any preceding axes as batch dimensions. Sparse (sparse.COO) inputs are supported only for 2-D arrays.

Parameters:

Name Type Description Default
coeffs Coeffs

A sparse or dense spherical harmonics coefficients array in standard Fourier order. The last two axes should have shape (lmax + 1, 2 * mmax + 1).

required
lmax int

The target maximum spherical harmonic degree.

required
mmax int | None

The target maximum spherical harmonic order. If None, it is set equal to lmax.

None

Returns:

Type Description
Coeffs

A sparse or dense spherical harmonics coefficients array reshaped to the target bandlimits.

Source code in src/serval/sht.py
def set_bandlimits(coeffs: Coeffs, lmax: int, mmax: int | None = None) -> Coeffs:
    r"""Reshapes sparse or dense spherical harmonics coefficients to given bandlimits.

    Operates on the last two axes of ``coeffs``, treating any preceding axes as
    batch dimensions.  Sparse (``sparse.COO``) inputs are supported only for 2-D
    arrays.

    Parameters
    ----------
    coeffs : Coeffs
        A sparse or dense spherical harmonics coefficients array in standard Fourier order.
        The last two axes should have shape ``(lmax + 1, 2 * mmax + 1)``.
    lmax : int
        The target maximum spherical harmonic degree.
    mmax : int | None, optional
        The target maximum spherical harmonic order. If None, it is set equal to lmax.
    Returns
    -------
    Coeffs
        A sparse or dense spherical harmonics coefficients array reshaped to the target bandlimits.
    """
    xp = array_namespace(coeffs)
    if mmax is None:
        mmax = lmax
    out = coeffs.copy()
    lead = tuple((0, 0) for _ in range(out.ndim - 2))
    start_lmax = coeffs.shape[-2] - 1
    delta_lmax = start_lmax - lmax
    if delta_lmax > 0:
        out = out[..., :-delta_lmax, :]
    elif delta_lmax < 0:
        pad_width = (*lead, (0, abs(delta_lmax)), (0, 0))
        out = xp.pad(out, pad_width, constant_values=0.0)
    start_mmax = (coeffs.shape[-1] - 1) // 2
    delta_mmax = start_mmax - mmax
    if delta_mmax > 0:
        out = out[..., :, delta_mmax:-delta_mmax]
    elif delta_mmax < 0:
        pad_width = (*lead, (0, 0), (abs(delta_mmax), abs(delta_mmax)))
        out = xp.pad(out, pad_width, constant_values=0.0)
    return out

sparse_synthesis(sparse_coeff)

Transfomrs spherical harmonics coefficients from a sparse array to a pyshtools grid.

Parameters:

Name Type Description Default
sparse_coeff COO

A sparse spherical harmonics coefficients array in standard Fourier order. Its shape should be (lmax + 1, 2 * mmax + 1).

required

Returns:

Type Description
SHGrid

A spherical harmonic grid object.

Source code in src/serval/sht.py
def sparse_synthesis(sparse_coeff: sparse.COO) -> pysh.SHGrid:
    r"""Transfomrs spherical harmonics coefficients from a sparse array to a pyshtools grid.
    Parameters
    ----------
    sparse_coeff : sparse.COO
        A sparse spherical harmonics coefficients array in standard Fourier order.
        Its shape should be (lmax + 1, 2 * mmax + 1).
    Returns
    -------
    pysh.SHGrid
        A spherical harmonic grid object.
    """
    return from_sparse(sparse_coeff).expand(
        grid=GRID_CONVENTIONS["grid"], extend=GRID_CONVENTIONS["extend"]
    )

stack_ms(coeffs)

Reorganizes spherical harmonics coefficients from the pyshtools format to the standard Fourrier order.

Parameters:

Name Type Description Default
coeffs NDArray[complex128]

A spherical harmonics coefficients array in pyshtools format. Its shape should be (2, lmax + 1, mmax + 1), where the first dimension corresponds to negative and positive m values.

required

Returns:

Type Description
NDArray[complex128]

A spherical harmonics coefficients array in standard Fourier order. Its shape will be (lmax + 1, 2 * mmax + 1), with m values ranging from -mmax to +mmax.

Source code in src/serval/sht.py
def stack_ms(coeffs: npt.NDArray[np.complex128]) -> npt.NDArray[np.complex128]:
    r"""Reorganizes spherical harmonics coefficients from the pyshtools format
    to the standard Fourrier order.
    Parameters
    ----------
    coeffs : npt.NDArray[np.complex128]
        A spherical harmonics coefficients array in pyshtools format. Its shape
        should be (2, lmax + 1, mmax + 1), where the first dimension
        corresponds to negative and positive m values.
    Returns
    -------
    npt.NDArray[np.complex128]
        A spherical harmonics coefficients array in standard Fourier order. Its shape
        will be (lmax + 1, 2 * mmax + 1), with m values ranging from -mmax to +mmax.
    """
    neg_ms = coeffs[1, :, :0:-1]
    pos_ms = coeffs[0, ...]
    return np.concatenate([neg_ms, pos_ms], axis=-1)

threshold_bandlimits(coeff, threshold=SPARSE_THRESHOLD)

Computes the bandlimits lmax, mmax of spherical harmonics coefficients from a sparse or dense array based on the non-zero entries, after setting the coefficients below a given threshold to zero.

Parameters:

Name Type Description Default
coeff COO | NDArray[complex128]

A sparse or dense spherical harmonics coefficients array in standard Fourier order. Its shape should be (lmax + 1, 2 * mmax + 1).

required
threshold float

The threshold below which coefficients are set to zero.

SPARSE_THRESHOLD

Returns:

Type Description
tuple[int, int]

A tuple containing the maximum spherical harmonic degree lmax and maximum order mmax based on the filtered coefficients.

Source code in src/serval/sht.py
def threshold_bandlimits(
    coeff: sparse.COO | npt.NDArray[np.complex128], threshold: float = SPARSE_THRESHOLD
) -> tuple[int, int]:
    r"""Computes the bandlimits lmax, mmax of spherical harmonics coefficients from a sparse or
    dense array based on the non-zero entries, after setting the coefficients below a given
    threshold to zero.
    Parameters
    ----------
    coeff : sparse.COO | npt.NDArray[np.complex128]
        A sparse or dense spherical harmonics coefficients array in standard Fourier order.
        Its shape should be (lmax + 1, 2 * mmax + 1).
    threshold : float, optional
        The threshold below which coefficients are set to zero.
    Returns
    -------
    tuple[int, int]
        A tuple containing the maximum spherical harmonic degree lmax and
        maximum order mmax based on the filtered coefficients.
    """
    xp = array_namespace(coeff)
    return nonzero_bandlimits(xp.where(sparse.abs(coeff) <= threshold, 0.0, coeff))

to_sparse(coeffs, threshold=SPARSE_THRESHOLD)

Transfomrs spherical harmonics coefficients from pyshtools format to a sparse array in standard Fourier order.

Parameters:

Name Type Description Default
coeffs SHCoeffs

A spherical harmonics coefficients object in pyshtools format. Its shape should be (2, lmax + 1, mmax + 1).

required
threshold float

The threshold below which coefficients are set to zero.

SPARSE_THRESHOLD

Returns:

Type Description
COO

A sparse spherical harmonics coefficients array in standard Fourier order. Its shape will be (lmax + 1, 2 * mmax + 1).

Source code in src/serval/sht.py
def to_sparse(coeffs: pysh.SHCoeffs, threshold: float = SPARSE_THRESHOLD) -> sparse.COO:
    r"""Transfomrs spherical harmonics coefficients from pyshtools format to a sparse array
    in standard Fourier order.
    Parameters
    ----------
    coeffs : pysh.SHCoeffs
        A spherical harmonics coefficients object in pyshtools format. Its shape
        should be (2, lmax + 1, mmax + 1).
    threshold : float, optional
        The threshold below which coefficients are set to zero.
    Returns
    -------
    sparse.COO
        A sparse spherical harmonics coefficients array in standard Fourier order.
        Its shape will be (lmax + 1, 2 * mmax + 1).
    """
    out = sparse.COO.from_numpy(stack_ms(coeffs.coeffs))
    out = sparse.where(sparse.abs(out) < threshold, 0.0, out)
    return out

unstack_ms(coeffs)

Reorganizes spherical harmonics coefficients from the standard Fourier order to the pyshtools format.

Parameters:

Name Type Description Default
coeffs NDArray[complex128]

A spherical harmonics coefficients array in standard Fourier order. Its shape should be (lmax + 1, 2 * mmax + 1), with m values ranging from -mmax to +mmax.

required

Returns:

Type Description
NDArray[complex128]

A spherical harmonics coefficients array in pyshtools format. Its shape will be (2, lmax + 1, mmax + 1), where the first dimension corresponds to negative and positive m values.

Source code in src/serval/sht.py
def unstack_ms(coeffs: npt.NDArray[np.complex128]) -> npt.NDArray[np.complex128]:
    r"""Reorganizes spherical harmonics coefficients from the standard Fourier order
    to the pyshtools format.
    Parameters
    ----------
    coeffs : npt.NDArray[np.complex128]
        A spherical harmonics coefficients array in standard Fourier order. Its shape
        should be (lmax + 1, 2 * mmax + 1), with m values ranging from -mmax to +mmax.
    Returns
    -------
    npt.NDArray[np.complex128]
        A spherical harmonics coefficients array in pyshtools format. Its shape
        will be (2, lmax + 1, mmax + 1), where the first dimension
        corresponds to negative and positive m values.
    """
    mmax = (coeffs.shape[1] - 1) // 2
    out = np.zeros(shape=(2, coeffs.shape[0], mmax + 1), dtype=coeffs.dtype)
    out[0, :, :] = coeffs[:, mmax:]
    out[1, :, 1:] = coeffs[:, mmax - 1 :: -1]
    return out

General Utilities (serval.utils.py)

airy_pattern(freq, theta, phi, D_eff, asymmetry_ratio=1.0, asymmetry_angle=0.0, power=False, fwhm_fac=AIRY_FWHM_FACTOR)

Evaluate an Airy disk beam pattern in pointing-frame coordinates.

The voltage pattern is \(2 J_1(x) / x\) with \(x = \pi \cdot \mathrm{AIRY\_FWHM\_FACTOR} \cdot D_\mathrm{eff} \sin\theta / (\mathrm{fwhm\_fac} \cdot \lambda)\), so the power-beam FWHM equals \(\mathrm{fwhm\_fac} \cdot \lambda / D_\mathrm{eff}\) exactly in \(\sin\theta\). Use gaussian_pattern with use_sin_theta=True for a Gaussian whose FWHM is defined in the same \(\sin\theta\) coordinate.

Parameters:

Name Type Description Default
freq float or array_like

Frequency in MHz, broadcastable to (n_freq, ntheta, nphi).

required
theta array_like

Pointing-frame colatitude in radians, shape (ntheta, nphi).

required
phi array_like

Pointing-frame azimuth in radians, shape (ntheta, nphi).

required
D_eff float

Effective dish diameter in metres.

required
asymmetry_ratio float

Ratio of semi-major to semi-minor aperture (\(\geq 1\)). Default 1.0.

1.0
asymmetry_angle float

Orientation of the major axis from \(\phi = 0\), in radians. Default 0.0.

0.0
power bool

If True, return a power beam (squared amplitude); otherwise return the voltage amplitude. Default False.

False
fwhm_fac float

Power-beam FWHM in units of \(\lambda / D_\mathrm{eff}\). Default AIRY_FWHM_FACTOR (~1.029), giving the true Airy pattern for a uniformly-illuminated circular aperture.

AIRY_FWHM_FACTOR

Returns:

Type Description
NDArray[float64]

Beam pattern evaluated on the input grid.

Source code in src/serval/utils.py
def airy_pattern(
    freq: float | npt.NDArray[np.float64],
    theta: float | npt.NDArray[np.float64],
    phi: float | npt.NDArray[np.float64],
    D_eff: float,
    asymmetry_ratio: float = 1.0,
    asymmetry_angle: float = 0.0,
    power=False,
    fwhm_fac=AIRY_FWHM_FACTOR,
):
    r"""Evaluate an Airy disk beam pattern in pointing-frame coordinates.

    The voltage pattern is $2 J_1(x) / x$ with
    $x = \pi \cdot \mathrm{AIRY\_FWHM\_FACTOR} \cdot D_\mathrm{eff} \sin\theta
    / (\mathrm{fwhm\_fac} \cdot \lambda)$, so the power-beam FWHM equals
    $\mathrm{fwhm\_fac} \cdot \lambda / D_\mathrm{eff}$ exactly in $\sin\theta$.
    Use [gaussian_pattern][serval.utils.gaussian_pattern] with ``use_sin_theta=True``
    for a Gaussian whose FWHM is defined in the same $\sin\theta$ coordinate.

    Parameters
    ----------
    freq : float or array_like
        Frequency in MHz, broadcastable to ``(n_freq, ntheta, nphi)``.
    theta : array_like
        Pointing-frame colatitude in radians, shape ``(ntheta, nphi)``.
    phi : array_like
        Pointing-frame azimuth in radians, shape ``(ntheta, nphi)``.
    D_eff : float
        Effective dish diameter in metres.
    asymmetry_ratio : float
        Ratio of semi-major to semi-minor aperture ($\geq 1$).  Default 1.0.
    asymmetry_angle : float
        Orientation of the major axis from $\phi = 0$, in radians.  Default 0.0.
    power : bool
        If ``True``, return a power beam (squared amplitude); otherwise return
        the voltage amplitude.  Default ``False``.
    fwhm_fac : float
        Power-beam FWHM in units of $\lambda / D_\mathrm{eff}$.  Default
        `AIRY_FWHM_FACTOR` (~1.029), giving the true Airy pattern for a
        uniformly-illuminated circular aperture.

    Returns
    -------
    npt.NDArray[np.float64]
        Beam pattern evaluated on the input grid.
    """
    wl = (freq * units.MHz).to("m", equivalencies=units.spectral()).value
    if asymmetry_ratio != 1.0:
        cos_a = np.cos(phi - asymmetry_angle)
        sin_a = np.sin(phi - asymmetry_angle)
        D_phi = D_eff / np.sqrt(cos_a**2 + sin_a**2 / asymmetry_ratio**2)
    else:
        D_phi = D_eff
    x = np.pi * AIRY_FWHM_FACTOR * D_phi * np.sin(theta) / (wl * fwhm_fac)
    with np.errstate(invalid="ignore", divide="ignore"):
        airy_amp = 2 * j1(x) / x
        airy_amp[x == 0] = 1.0
    if power:
        return airy_amp**2
    else:
        return airy_amp

analytic_plane_wave_al(ells, kz)

Computes the analytic spherical harmonic coefficients of a plane wave propagating in the z-direction.

Parameters:

Name Type Description Default
ells NDArray[int64]

The spherical harmonic degrees at which to evaluate the coefficients.

required
kz NDArray[float64]

The wavevector magnitudes at which to evaluate the coefficients.

required

Returns:

Type Description
NDArray[complex128]

The spherical harmonic coefficients of the plane wave at the specified degrees and wavevector magnitudes.

Source code in src/serval/utils.py
def analytic_plane_wave_al(
    ells: npt.NDArray[np.int64], kz: npt.NDArray[np.float64]
) -> npt.NDArray[np.complex128]:
    r"""Computes the analytic spherical harmonic coefficients of a plane wave
    propagating in the z-direction.
    Parameters
    ----------
    ells : npt.NDArray[np.int64]
        The spherical harmonic degrees at which to evaluate the coefficients.
    kz : npt.NDArray[np.float64]
        The wavevector magnitudes at which to evaluate the coefficients.
    Returns
    -------
    npt.NDArray[np.complex128]
        The spherical harmonic coefficients of the plane wave at the specified
        degrees and wavevector magnitudes.
    """
    return spherical_jn(ells, kz) * np.sqrt((2 * ells + 1) * 4 * np.pi) * (1j) ** ells

bandlimited_random_plane_wave(lmax, template_grid, rng, threshold=SPARSE_THRESHOLD)

Generates a random band-limited plane wave on the sphere evaluated on the grid.

Parameters:

Name Type Description Default
lmax int

The maximum spherical harmonic degree for band-limiting.

required
template_grid SHGrid

A pyshtools SHGrid object that serves as a template for the output grid.

required
rng Generator | None

An optional random number generator for reproducibility. If None, a new generator will be created. Default is None.

required
threshold float

The threshold for determining the plane wave magnitude from the bandlimit.

SPARSE_THRESHOLD

Returns:

Type Description
tuple[SHGrid, NDArray[float64]]

A tuple containing the generated band-limited plane wave grid and the corresponding wavevector.

Source code in src/serval/utils.py
def bandlimited_random_plane_wave(
    lmax: int,
    template_grid: pysh.SHGrid,
    rng: np.random.Generator | None,
    threshold: float = SPARSE_THRESHOLD,
) -> tuple[pysh.SHGrid, npt.NDArray[np.float64]]:
    r"""Generates a random band-limited plane wave on the sphere evaluated on the grid.
    Parameters
    ----------
    lmax : int
        The maximum spherical harmonic degree for band-limiting.
    template_grid : pysh.SHGrid
        A pyshtools SHGrid object that serves as a template for the output grid.
    rng : np.random.Generator | None
        An optional random number generator for reproducibility. If None, a new generator
        will be created. Default is None.
    threshold : float, optional
        The threshold for determining the plane wave magnitude from the bandlimit.
    Returns
    -------
    tuple[pysh.SHGrid, npt.NDArray[np.float64]]
        A tuple containing the generated band-limited plane wave grid and the
        corresponding wavevector.
    """
    if rng is None:
        rng = np.random.default_rng()
    unrot_k = np.array([0, 0, plane_wave_mag_from_bandlimit(lmax, threshold=threshold)])
    eulers = rng.random(3) * np.array([2 * np.pi, np.pi, 2 * np.pi])
    rot_mat = rotation_from_eulers(eulers)
    k = rot_mat.apply(unrot_k)
    plane_wave_grid = plane_waves(
        [
            k,
        ],
        template_grid,
    )
    return (
        plane_wave_grid,
        k,
    )

gaussian_pattern(freq, theta, phi, D_eff, asymmetry_ratio=1.0, asymmetry_angle=0.0, power=False, fwhm_fac=AIRY_FWHM_FACTOR, use_sin_theta=False)

Evaluate a Gaussian beam pattern in pointing-frame coordinates.

The FWHM of the power beam along the major axis is \(\mathrm{fwhm\_fac} \cdot \lambda / D_\mathrm{eff}\), exact in \(\theta\) by default. Setting use_sin_theta=True replaces \(\theta\) with \(\sin\theta\) throughout, making the FWHM exact in \(\sin\theta\) — directly comparable to airy_pattern. An optional elliptical asymmetry stretches the beam along a specified axis.

Parameters:

Name Type Description Default
freq float or array_like

Frequency in MHz, broadcastable to (n_freq, ntheta, nphi).

required
theta array_like

Pointing-frame colatitude in radians, shape (ntheta, nphi).

required
phi array_like

Pointing-frame azimuth in radians, shape (ntheta, nphi).

required
D_eff float

Effective dish diameter in metres.

required
asymmetry_ratio float

Ratio of semi-major to semi-minor beam width (\(\geq 1\)). Default 1.0.

1.0
asymmetry_angle float

Orientation of the major axis from \(\phi = 0\), in radians. Default 0.0.

0.0
power bool

If True, return a power beam (\(e^{-\mathrm{exponent}/2}\)); otherwise return a voltage amplitude (\(e^{-\mathrm{exponent}/4}\)). Default False.

False
fwhm_fac float

Power-beam FWHM in units of \(\lambda / D_\mathrm{eff}\). Default AIRY_FWHM_FACTOR (~1.029), matching the FWHM of a uniformly-illuminated circular aperture.

AIRY_FWHM_FACTOR
use_sin_theta bool

If True, use \(\sin\theta\) instead of \(\theta\) in the Gaussian exponent. The FWHM is then exact in \(\sin\theta\), matching the physically motivated airy_pattern. Default False.

False

Returns:

Type Description
NDArray[float64]

Beam pattern evaluated on the input grid.

Source code in src/serval/utils.py
def gaussian_pattern(
    freq: float | npt.NDArray[np.float64],
    theta: float | npt.NDArray[np.float64],
    phi: float | npt.NDArray[np.float64],
    D_eff: float,
    asymmetry_ratio: float = 1.0,
    asymmetry_angle: float = 0.0,
    power=False,
    fwhm_fac=AIRY_FWHM_FACTOR,
    use_sin_theta: bool = False,
):
    r"""Evaluate a Gaussian beam pattern in pointing-frame coordinates.

    The FWHM of the power beam along the major axis is $\mathrm{fwhm\_fac} \cdot
    \lambda / D_\mathrm{eff}$, exact in $\theta$ by default.  Setting
    ``use_sin_theta=True`` replaces $\theta$ with $\sin\theta$ throughout, making
    the FWHM exact in $\sin\theta$ — directly comparable to
    [airy_pattern][serval.utils.airy_pattern].  An optional elliptical asymmetry
    stretches the beam along a specified axis.

    Parameters
    ----------
    freq : float or array_like
        Frequency in MHz, broadcastable to ``(n_freq, ntheta, nphi)``.
    theta : array_like
        Pointing-frame colatitude in radians, shape ``(ntheta, nphi)``.
    phi : array_like
        Pointing-frame azimuth in radians, shape ``(ntheta, nphi)``.
    D_eff : float
        Effective dish diameter in metres.
    asymmetry_ratio : float
        Ratio of semi-major to semi-minor beam width ($\geq 1$).  Default 1.0.
    asymmetry_angle : float
        Orientation of the major axis from $\phi = 0$, in radians.  Default 0.0.
    power : bool
        If ``True``, return a power beam ($e^{-\mathrm{exponent}/2}$); otherwise
        return a voltage amplitude ($e^{-\mathrm{exponent}/4}$).  Default ``False``.
    fwhm_fac : float
        Power-beam FWHM in units of $\lambda / D_\mathrm{eff}$.  Default
        `AIRY_FWHM_FACTOR` (~1.029), matching the FWHM of a uniformly-illuminated
        circular aperture.
    use_sin_theta : bool
        If ``True``, use $\sin\theta$ instead of $\theta$ in the Gaussian exponent.
        The FWHM is then exact in $\sin\theta$, matching the physically motivated
        [airy_pattern][serval.utils.airy_pattern].  Default ``False``.

    Returns
    -------
    npt.NDArray[np.float64]
        Beam pattern evaluated on the input grid.
    """
    wl = (freq * units.MHz).to("m", equivalencies=units.spectral()).value
    sigma = (fwhm_fac * wl / D_eff) * gaussian_fwhm_to_sigma
    t = np.sin(theta) if use_sin_theta else theta
    if asymmetry_ratio != 1.0:
        sigma_major = sigma
        sigma_minor = sigma / asymmetry_ratio
        x = t * np.cos(phi - asymmetry_angle)
        y = t * np.sin(phi - asymmetry_angle)
        exponent = x**2 / sigma_major**2 + y**2 / sigma_minor**2
    else:
        exponent = t**2 / sigma**2
    if power:
        return np.exp(-exponent / 2)
    else:
        return np.exp(-exponent / 4)

grid_theta_phi(grid, meshgrid=False)

Return the colatitude and azimuth arrays for a pyshtools grid.

Converts the grid latitude and longitude (in degrees) to physics convention spherical coordinates: colatitude \(\theta \in [0, \pi]\) (with \(\theta = 0\) at the north pole) and azimuth \(\phi \in [0, 2\pi)\).

Parameters:

Name Type Description Default
grid SHGrid

A pyshtools grid object.

required
meshgrid bool

If True, return 2-D arrays broadcastable to the grid shape (nlat, nlon) (as from numpy.meshgrid with indexing="ij"). Default is False.

False

Returns:

Name Type Description
theta NDArray[float64]

Colatitude in radians. Shape (nlat,) or (nlat, nlon).

phi NDArray[float64]

Azimuth in radians. Shape (nlon,) or (nlat, nlon).

Source code in src/serval/utils.py
def grid_theta_phi(
    grid: pysh.SHGrid,
    meshgrid: bool = False,
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]:
    r"""Return the colatitude and azimuth arrays for a pyshtools grid.

    Converts the grid latitude and longitude (in degrees) to physics
    convention spherical coordinates: colatitude $\theta \in [0, \pi]$
    (with $\theta = 0$ at the north pole) and azimuth
    $\phi \in [0, 2\pi)$.

    Parameters
    ----------
    grid : pysh.SHGrid
        A pyshtools grid object.
    meshgrid : bool
        If True, return 2-D arrays broadcastable to the grid shape
        ``(nlat, nlon)`` (as from ``numpy.meshgrid`` with
        ``indexing="ij"``).  Default is False.

    Returns
    -------
    theta : npt.NDArray[np.float64]
        Colatitude in radians.  Shape ``(nlat,)`` or ``(nlat, nlon)``.
    phi : npt.NDArray[np.float64]
        Azimuth in radians.  Shape ``(nlon,)`` or ``(nlat, nlon)``.
    """
    theta = np.deg2rad(90.0 - grid.lats())
    phi = np.deg2rad(grid.lons())
    if meshgrid:
        theta, phi = np.meshgrid(theta, phi, indexing="ij")
    return theta, phi

harmonic_point_source(ra_deg, dec_deg, lmax, return_sparse=True)

harmonic_point_source(
    ra_deg: float,
    dec_deg: float,
    lmax: int,
    return_sparse: Literal[True] = ...,
) -> sparse.COO
harmonic_point_source(
    ra_deg: float,
    dec_deg: float,
    lmax: int,
    return_sparse: Literal[False] = ...,
) -> npt.NDArray[np.complex128]

Generates band-limited spherical harmonic coefficients for a point source at given RA and Dec.

Parameters:

Name Type Description Default
ra_deg float

The right ascension of the point source in degrees.

required
dec_deg float

The declination of the point source in degrees.

required
lmax int

The maximum spherical harmonic degree.

required
return_sparse bool

If True (default), return a sparse.COO array. If False, return a dense numpy array of shape (lmax+1, 2*lmax+1).

True

Returns:

Type Description
COO | NDArray[complex128]

The band-limited spherical harmonic coefficients of the point source in standard Fourier order, shape (lmax+1, 2*lmax+1).

Source code in src/serval/utils.py
def harmonic_point_source(
    ra_deg: float, dec_deg: float, lmax: int, return_sparse: bool = True
) -> sparse.COO | npt.NDArray[np.complex128]:
    r"""Generates band-limited spherical harmonic coefficients for a point source
    at given RA and Dec.
    Parameters
    ----------
    ra_deg : float
        The right ascension of the point source in degrees.
    dec_deg : float
        The declination of the point source in degrees.
    lmax : int
        The maximum spherical harmonic degree.
    return_sparse : bool, optional
        If True (default), return a sparse.COO array. If False, return a dense
        numpy array of shape (lmax+1, 2*lmax+1).
    Returns
    -------
    sparse.COO | npt.NDArray[np.complex128]
        The band-limited spherical harmonic coefficients of the point source in
        standard Fourier order, shape (lmax+1, 2*lmax+1).
    """
    ps_theta = np.pi / 2 - np.radians(dec_deg)
    ps_phi = np.radians(ra_deg)
    eulers = inv_eulers((ps_phi, ps_theta, 0))
    # wigner_d with mprime_max=0 gives shape (lmax+1, 1, 2*lmax+1).
    D = wigner_d(lmax, 0, eulers)
    ell = np.arange(lmax + 1)
    alm = D[:, 0, :] * np.sqrt((2 * ell + 1) / (4 * np.pi))[:, np.newaxis]
    if not return_sparse:
        return alm
    out = sparse.COO.from_numpy(alm)
    return out

integral_from_coeff(coeff)

Computes the integral over the sphere from spherical harmonic coefficients.

Parameters:

Name Type Description Default
coeff SHCoeffs

The spherical harmonic coefficients to integrate in pyshtools format.

required

Returns:

Type Description
float

The integral of the function represented by the spherical harmonic coefficients over the unit sphere.

Source code in src/serval/utils.py
def integral_from_coeff(coeff: pysh.SHCoeffs) -> float:
    r"""Computes the integral over the sphere from spherical harmonic coefficients.
    Parameters
    ----------
    coeff : pysh.SHCoeffs
        The spherical harmonic coefficients to integrate in pyshtools format.
    Returns
    -------
    float
        The integral of the function represented by the spherical harmonic coefficients
        over the unit sphere.
    """
    # +, l=0, |m|=0, assumes SHT_CONVENTIONS
    return coeff.coeffs[0][0, 0] * np.sqrt(4 * np.pi)

integral_from_grid(grid_integrand)

Computes the integral over the sphere from a spherical harmonic grid.

Parameters:

Name Type Description Default
coeff SHGrid

The spherical harmonic grid to integrate in pyshtools format.

required

Returns:

Type Description
float

The integral of the function represented by the spherical harmonic grid over the unit sphere.

Source code in src/serval/utils.py
def integral_from_grid(grid_integrand: pysh.SHGrid) -> float:
    r"""Computes the integral over the sphere from a spherical harmonic grid.
    Parameters
    ----------
    coeff : pysh.SHGrid
        The spherical harmonic grid to integrate in pyshtools format.
    Returns
    -------
    float
        The integral of the function represented by the spherical harmonic grid
        over the unit sphere.
    """
    return integral_from_coeff(grid_integrand.expand(lmax_calc=0, **SHT_CONVENTIONS))

integrals_from_alm(alm)

Compute the integral over the unit sphere for each frequency channel.

Reads the l=0, m=0 coefficient directly from SERVAL's Fourier-ordered alm array, consistent with the ortho+csphase=1 SHT convention.

Parameters:

Name Type Description Default
alm (ndarray, shape(..., lmax + 1, 2 * mmax + 1))

Spherical harmonic coefficients in SERVAL Fourier format. m=0 is at column index mmax = (alm.shape[-1] - 1) // 2.

required

Returns:

Type Description
(ndarray, shape(...))

Integral of the function over the unit sphere per leading-axis element. Complex in general; real for real-valued beams.

Source code in src/serval/utils.py
def integrals_from_alm(
    alm: npt.NDArray[np.complex128],
) -> npt.NDArray[np.complex128]:
    """Compute the integral over the unit sphere for each frequency channel.

    Reads the l=0, m=0 coefficient directly from SERVAL's Fourier-ordered
    alm array, consistent with the ortho+csphase=1 SHT convention.

    Parameters
    ----------
    alm : ndarray, shape (..., lmax+1, 2*mmax+1)
        Spherical harmonic coefficients in SERVAL Fourier format.
        m=0 is at column index ``mmax = (alm.shape[-1] - 1) // 2``.

    Returns
    -------
    ndarray, shape (...,)
        Integral of the function over the unit sphere per leading-axis element.
        Complex in general; real for real-valued beams.
    """
    mmax = (alm.shape[-1] - 1) // 2
    return alm[..., 0, mmax] * np.sqrt(4 * np.pi)

mmodes_to_visibilities(mmodes, m1max=None, ms=None)

Reconstruct a visibility timestream from m-mode amplitudes.

The output is the physical visibility timestream reconstructed from the Fourier-series amplitudes in mmodes:

\[ \mathrm{vis}(\phi) = \sum_m A_m \exp\!\left(2 \pi i m \phi / 360\right). \]

Internally this is evaluated via ifft(full_mmodes) multiplied by the output length to undo NumPy's inverse-FFT normalisation, so that :func:visibilities_to_mmodes and :func:mmodes_to_visibilities are true inverses on a consistent ERA grid.

Parameters:

Name Type Description Default
mmodes NDArray[complex128]

M-mode amplitudes.

  • ms=None: full array of shape (..., n_mmodes) in the same fftshifted order produced by :func:visibilities_to_mmodes (m-values increasing from left to right).
  • ms provided: sparse array of shape (..., len(ms)) where entry i is the amplitude for m-value ms[i]. Order does not matter; ms identifies which m each entry belongs to.
required
m1max int

Maximum absolute m-value of the output grid. Required when ms is provided. The output has 2*m1max+1 ERA samples.

None
ms array of int

M-values corresponding to the last axis of the sparse mmodes input. M-values outside [-m1max, m1max] are silently dropped.

None

Returns:

Type Description
NDArray[complex128]

Visibility timestream with shape (..., n_out).

Source code in src/serval/utils.py
def mmodes_to_visibilities(
    mmodes: npt.NDArray[np.complex128],
    m1max: int | None = None,
    ms: npt.NDArray[np.int64] | None = None,
) -> npt.NDArray[np.complex128]:
    r"""Reconstruct a visibility timestream from m-mode amplitudes.

    The output is the physical visibility timestream reconstructed from the
    Fourier-series amplitudes in ``mmodes``:

    $$
    \mathrm{vis}(\phi) = \sum_m A_m \exp\!\left(2 \pi i m \phi / 360\right).
    $$

    Internally this is evaluated via ``ifft(full_mmodes)`` multiplied by the
    output length to undo NumPy's inverse-FFT normalisation, so that
    :func:`visibilities_to_mmodes` and :func:`mmodes_to_visibilities` are true
    inverses on a consistent ERA grid.

    Parameters
    ----------
    mmodes : npt.NDArray[np.complex128]
        M-mode amplitudes.

        * ``ms=None``: full array of shape ``(..., n_mmodes)`` in the same
          fftshifted order produced by :func:`visibilities_to_mmodes`
          (m-values increasing from left to right).
        * ``ms`` provided: sparse array of shape ``(..., len(ms))`` where
          entry ``i`` is the amplitude for m-value ``ms[i]``.  Order does not
          matter; ``ms`` identifies which m each entry belongs to.
    m1max : int, optional
        Maximum absolute m-value of the output grid.  Required when ``ms`` is
        provided.  The output has ``2*m1max+1`` ERA samples.
    ms : array of int, optional
        M-values corresponding to the last axis of the sparse ``mmodes`` input.
        M-values outside ``[-m1max, m1max]`` are silently dropped.
    Returns
    -------
    npt.NDArray[np.complex128]
        Visibility timestream with shape ``(..., n_out)``.
    """
    if ms is None:
        full_mmodes = np.fft.fftshift(mmodes, axes=-1)
    else:
        if not isinstance(m1max, int):
            raise TypeError("m1max must be an integer, since ms is not None.")
        if mmodes.ndim == 1:
            full_mmodes = empty_complex_array(2 * m1max + 1)
            fftfreq = np.fft.ifftshift(
                np.linspace(-m1max, m1max, 2 * m1max + 1)
            ).astype(int)
            _, comm1, _ = np.intersect1d(fftfreq, ms, return_indices=True)
            full_mmodes[comm1] = mmodes
        else:
            full_mmodes = empty_complex_array(mmodes.shape[:-1] + (2 * m1max + 1,))
            fftfreq = np.fft.ifftshift(
                np.linspace(-m1max, m1max, 2 * m1max + 1)
            ).astype(int)
            _, comm1, _ = np.intersect1d(fftfreq, ms, return_indices=True)
            full_mmodes[..., comm1] = mmodes

    return full_mmodes.shape[-1] * np.fft.ifft(full_mmodes, axis=-1)

normal_to_spherical(n)

Transforms arrays of Cartesian unit normal vectors to spherical coordinates.

Parameters:

Name Type Description Default
n NDArray[float64]

Array of Cartesian unit normal vectors, shape (..., 3).

required

Returns:

Name Type Description
theta NDArray[float64]

Colatitude angles in radians, shape (...,).

phi NDArray[float64]

Azimuthal angles in radians in [0, 2Ï€), shape (...,).

Source code in src/serval/utils.py
def normal_to_spherical(
    n: npt.NDArray[np.float64],
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]:
    r"""Transforms arrays of Cartesian unit normal vectors to spherical coordinates.

    Parameters
    ----------
    n : npt.NDArray[np.float64]
        Array of Cartesian unit normal vectors, shape ``(..., 3)``.

    Returns
    -------
    theta : npt.NDArray[np.float64]
        Colatitude angles in radians, shape ``(...,)``.
    phi : npt.NDArray[np.float64]
        Azimuthal angles in radians in ``[0, 2Ï€)``, shape ``(...,)``.
    """
    theta = np.arccos(np.clip(n[..., 2], -1.0, 1.0))
    phi = np.arctan2(n[..., 1], n[..., 0]) % (2 * np.pi)
    return theta, phi

plane_wave_bandlimits(k, threshold=SPARSE_THRESHOLD)

Computes the spherical harmonic bandlimits lmax, mmax required to represent a plane wave with wavevector k to a given threshold.

Parameters:

Name Type Description Default
k NDArray[float64]

The wavevector of the plane wave.

required
threshold float

The threshold for determining the bandlimits. Default is SPARSE_THRESHOLD.

SPARSE_THRESHOLD

Returns:

Type Description
tuple[int, int]

A tuple containing the maximum spherical harmonic degree lmax and maximum absolute order mmax required to represent the plane wave.

Source code in src/serval/utils.py
def plane_wave_bandlimits(
    k: npt.NDArray[np.float64], threshold: float = SPARSE_THRESHOLD
) -> tuple[int, int]:
    r"""Computes the spherical harmonic bandlimits lmax, mmax required to
    represent a plane wave with wavevector k to a given threshold.
    Parameters
    ----------
    k : npt.NDArray[np.float64]
        The wavevector of the plane wave.
    threshold : float, optional
        The threshold for determining the bandlimits. Default is SPARSE_THRESHOLD.
    Returns
    -------
    tuple[int, int]
        A tuple containing the maximum spherical harmonic degree lmax and
        maximum absolute order mmax required to represent the plane wave.
    """
    # Threshold in |al|, not power
    mag_k = np.linalg.norm(k)
    lstart = max(0, int(np.round(mag_k)) - 1)
    lstop = max(13, 10 * lstart)
    plane_wave_al = np.abs(analytic_plane_wave_al(np.arange(lstart, lstop), mag_k))
    plane_wave_al /= plane_wave_al.max()
    lmax = np.argmin(plane_wave_al >= threshold) + lstart
    if mag_k == 0.0:
        mmax = 0  # +1 maybe?
    else:
        mmax = int(np.round(lmax * np.sqrt(k[0] ** 2 + k[1] ** 2) / mag_k))  # +1 maybe?
    return int(lmax), mmax

plane_wave_mag_from_bandlimit(lmax, threshold=SPARSE_THRESHOLD)

Computes the maximum usable plane-wave wavenumber |k| that can be represented on a spherical-harmonic grid of bandlimit lmax.

Parameters:

Name Type Description Default
lmax int

The maximum spherical harmonic degree for band-limiting.

required
threshold float

The threshold for determining the plane wave magnitude from the bandlimit.

SPARSE_THRESHOLD

Returns:

Type Description
float

The maximum usable plane-wave wavenumber |k|.

Source code in src/serval/utils.py
def plane_wave_mag_from_bandlimit(
    lmax: int, threshold: float = SPARSE_THRESHOLD
) -> float:
    r"""Computes the maximum usable plane-wave wavenumber |k| that can be represented
    on a spherical-harmonic grid of bandlimit lmax.
    Parameters
    ----------
    lmax : int
        The maximum spherical harmonic degree for band-limiting.
    threshold : float, optional
        The threshold for determining the plane wave magnitude from the bandlimit.
    Returns
    -------
    float
        The maximum usable plane-wave wavenumber |k|.
    """
    at_bandlimit = np.abs(analytic_plane_wave_al(lmax, lmax))

    def to_opt(mag_k):
        return np.log10(
            np.abs(analytic_plane_wave_al(lmax, mag_k)) / at_bandlimit
        ) - np.log10(threshold)

    with np.errstate(divide="ignore"):
        return brentq(to_opt, 0, lmax)

plane_waves(ks, template)

Produces a band-limited spherical plane wave evaluated on the grid from wavevectors ks.

Parameters:

Name Type Description Default
ks list[NDArray[float64]]

A list of wavevector arrays. Each array should have shape (3,).

required
template SHGrid

A pyshtools SHGrid object that serves as a template for the output grid.

required

Returns:

Type Description
SHGrid

A pyshtools SHGrid object containing the plane wave data.

Source code in src/serval/utils.py
def plane_waves(
    ks: list[npt.NDArray[np.float64]], template: pysh.SHGrid
) -> pysh.SHGrid:
    r"""Produces a band-limited spherical plane wave evaluated on the grid from wavevectors ks.
    Parameters
    ----------
    ks : list[npt.NDArray[np.float64]]
        A list of wavevector arrays. Each array should have shape (3,).
    template : pysh.SHGrid
        A pyshtools SHGrid object that serves as a template for the output grid.
    Returns
    -------
    pysh.SHGrid
        A pyshtools SHGrid object containing the plane wave data."""
    theta, phi = np.meshgrid(
        np.pi / 2 - template.lats(degrees=False),
        template.lons(degrees=False),
        indexing="ij",
    )
    nhat = spherical_to_normal(theta, phi)
    out_grid = template.copy()
    tot_k = np.sum(ks, axis=0)
    if np.linalg.norm(tot_k) > template.lmax:
        raise ValueError(
            f"Harmonic order of grid is too low to sample plane wave at bandlimit. "
            f"|k| = {np.linalg.norm(tot_k):.4f} > {template.lmax:.4f}"
        )
    out_grid.data = np.exp(1j * (nhat @ tot_k))
    return out_grid

plane_waves_integral(ks)

Computes the integral over the unit sphere of plane waves with wavevectors ks.

\[ \int_{S^2} e^{\,i\,\mathbf{k}\cdot\mathbf{n}}\, d\Omega = 4\pi\, \frac{\sin\lvert \mathbf{k}\rvert}{\lvert \mathbf{k}\rvert} \]

Parameters:

Name Type Description Default
ks list[NDArray[float64]]

A list of wavevector arrays. Each array should have shape (..., 3), where ... represents any number of leading dimensions.

required

Returns:

Type Description
float | NDArray[float64]

The integral of the product of plane waves over the unit sphere. The output shape is the broadcasted shape of the input wavevector arrays without the last dimension.

Source code in src/serval/utils.py
def plane_waves_integral(
    ks: list[npt.NDArray[np.float64]],
) -> float | npt.NDArray[np.float64]:
    r"""Computes the integral over the unit sphere of plane waves with wavevectors ks.

    $$
    \int_{S^2} e^{\,i\,\mathbf{k}\cdot\mathbf{n}}\, d\Omega
        = 4\pi\, \frac{\sin\lvert \mathbf{k}\rvert}{\lvert \mathbf{k}\rvert}
    $$

    Parameters
    ----------
    ks : list[npt.NDArray[np.float64]]
        A list of wavevector arrays. Each array should have shape (..., 3), where
        ... represents any number of leading dimensions.
    Returns
    -------
    float | npt.NDArray[np.float64]
        The integral of the product of plane waves over the unit sphere. The output
        shape is the broadcasted shape of the input wavevector arrays without the last
        dimension.
    """
    # Can be broadcastable vector stacks with xyz on -1 axis.
    total_k = np.sum(np.array(np.broadcast_arrays(*ks)), axis=0)
    mag_k = np.linalg.norm(total_k, axis=-1)
    return 4 * np.pi * np.sinc(mag_k / np.pi)

plane_waves_mmodes(ks, mmax)

Computes the m-modes of a product of plane waves by rotating one of the k-vectors (the first one) through all azimuthal angles to create visibilities.

Parameters:

Name Type Description Default
ks list[NDArray[float64]]

A list of wavevector arrays. Each array should have shape (3,).

required
mmax int

The maximum absolute m-mode index.

required

Returns:

Type Description
NDArray[complex128]

The m-modes of the product of plane waves, with shape (2 * mmax + 1,).

Source code in src/serval/utils.py
def plane_waves_mmodes(
    ks: list[npt.NDArray[np.float64]], mmax: int
) -> npt.NDArray[np.complex128]:
    r"""Computes the m-modes of a product of plane waves by rotating one of the k-vectors
    (the first one) through all azimuthal angles to create visibilities.
    Parameters
    ----------
    ks : list[npt.NDArray[np.float64]]
        A list of wavevector arrays. Each array should have shape (3,).
    mmax : int
        The maximum absolute m-mode index.
    Returns
    -------
    npt.NDArray[np.complex128]
        The m-modes of the product of plane waves, with shape (2 * mmax + 1,).
    """
    # Rotates the first plane wave relative to the others.
    eras = np.linspace(0, 2 * np.pi, 2 * mmax + 1, endpoint=False)
    # Rotate k0 from cirs to tirs basis (sky-like)
    rot_k0s = np.array([tirs_to_cirs(era).inv().apply(ks[0]) for era in eras])
    pw_vis = plane_waves_integral(
        [
            rot_k0s,
        ]
        + ks[1:]
    )
    mmodes = visibilities_to_mmodes(pw_vis)
    return mmodes

pointed_theta_phi(lmax, latitude, longitude, altitude, azimuth, boresight)

Compute TIRS-grid coordinates expressed in the pointing frame.

For each point on the TIRS spherical harmonic grid (determined by lmax), computes the colatitude and azimuth as seen in the frame of the dish pointing direction. This is the coordinate map needed to evaluate a beam pattern (defined in pointing coordinates) over the whole sky grid.

Parameters:

Name Type Description Default
lmax int

Band-limit of the spherical harmonic grid.

required
latitude float

Geodetic latitude of the observer in radians.

required
longitude float

Geodetic longitude of the observer in radians.

required
altitude float

Altitude of the observer in metres.

required
azimuth float

Dish azimuth in radians (North-through-East).

required
boresight float

Boresight rotation angle in radians.

required

Returns:

Name Type Description
pointing_theta NDArray[float64]

Colatitude in the pointing frame for every TIRS grid point, shape (nlat, nlon).

pointing_phi NDArray[float64]

Azimuth in the pointing frame for every TIRS grid point, shape (nlat, nlon).

Source code in src/serval/utils.py
def pointed_theta_phi(
    lmax: int,
    latitude: float,
    longitude: float,
    altitude: float,
    azimuth: float,
    boresight: float,
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]:
    r"""Compute TIRS-grid coordinates expressed in the pointing frame.

    For each point on the TIRS spherical harmonic grid (determined by
    `lmax`), computes the colatitude and azimuth as seen in the frame
    of the dish pointing direction.  This is the coordinate map needed
    to evaluate a beam pattern (defined in pointing coordinates) over
    the whole sky grid.

    Parameters
    ----------
    lmax : int
        Band-limit of the spherical harmonic grid.
    latitude : float
        Geodetic latitude of the observer in radians.
    longitude : float
        Geodetic longitude of the observer in radians.
    altitude : float
        Altitude of the observer in metres.
    azimuth : float
        Dish azimuth in radians (North-through-East).
    boresight : float
        Boresight rotation angle in radians.

    Returns
    -------
    pointing_theta : npt.NDArray[np.float64]
        Colatitude in the pointing frame for every TIRS grid point,
        shape ``(nlat, nlon)``.
    pointing_phi : npt.NDArray[np.float64]
        Azimuth in the pointing frame for every TIRS grid point,
        shape ``(nlat, nlon)``.
    """
    grid = grid_template(lmax)
    tirs_theta, tirs_phi = grid_theta_phi(grid, meshgrid=True)
    tirs_normals = spherical_to_normal(tirs_theta, tirs_phi)
    rot_tirs_to_pointing = tirs_to_pointing(
        latitude=latitude,
        longitude=longitude,
        altitude=altitude,
        azimuth=azimuth,
        boresight=boresight,
    ).as_matrix()
    pointing_normals = (rot_tirs_to_pointing @ tirs_normals[..., None]).squeeze()
    pointing_theta = np.arccos(pointing_normals[..., 2])
    pointing_phi = np.arctan2(pointing_normals[..., 1], pointing_normals[..., 0])
    return pointing_theta, pointing_phi

power_law_sky(lmax, power_law_index=2.0, seed=None)

Generates a random sky realization with a power-law angular power spectrum.

Parameters:

Name Type Description Default
lmax int

The maximum spherical harmonic degree.

required
power_law_index float

The index of the power-law angular power spectrum. Default is 2.0.

2.0
seed int | None

The random seed for reproducibility. Default is None.

None

Returns:

Type Description
SHGrid

The generated random sky realization as a pyshtools spherical harmonic grid.

Source code in src/serval/utils.py
def power_law_sky(
    lmax: int, power_law_index: float = 2.0, seed: int | None = None
) -> pysh.SHGrid:
    r"""Generates a random sky realization with a power-law angular power spectrum.
    Parameters
    ----------
    lmax : int
        The maximum spherical harmonic degree.
    power_law_index : float, optional
        The index of the power-law angular power spectrum. Default is 2.0.
    seed : int | None, optional
        The random seed for reproducibility. Default is None.
    Returns
    -------
    pysh.SHGrid
        The generated random sky realization as a pyshtools spherical harmonic grid.
    """
    ells = np.arange(lmax + 1, dtype=float)
    power = np.zeros(lmax + 1, dtype=float)
    power[1:] = ells[1:] ** (-power_law_index)
    pl_sky_alm = stack_ms(
        pysh.SHCoeffs.from_random(
            power, kind="complex", seed=seed, **SHT_CONVENTIONS
        ).coeffs
    )
    pl_sky_grid = array_synthesis(pl_sky_alm)
    pl_sky_grid.data.imag = 0.0  # Make sky real.
    return pl_sky_grid

spherical_tangent_vectors(theta, phi)

Computes the tangent basis vectors at the given spherical coordinates.

Parameters:

Name Type Description Default
theta NDArray[float64]

The polar angles in radians.

required
phi NDArray[float64]

The azimuthal angles in radians.

required

Returns:

Type Description
tuple[NDArray[float64], NDArray[float64]]

The unit tangent vectors (theta_hat, phi_hat) in the directions of increasing theta and increasing phi, respectively.

Source code in src/serval/utils.py
def spherical_tangent_vectors(
    theta: npt.NDArray[np.float64], phi: npt.NDArray[np.float64]
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]:
    r"""Computes the tangent basis vectors at the given spherical coordinates.
    Parameters
    ----------
    theta : npt.NDArray[np.float64]
        The polar angles in radians.
    phi : npt.NDArray[np.float64]
        The azimuthal angles in radians.
    Returns
    -------
    tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]
        The unit tangent vectors (theta_hat, phi_hat) in the directions of
        increasing theta and increasing phi, respectively.
    """
    ct, st = np.cos(theta), np.sin(theta)
    cp, sp = np.cos(phi), np.sin(phi)
    theta_hat = np.stack([ct * cp, ct * sp, -st], axis=-1)
    phi_hat = np.stack([-sp, cp, np.zeros_like(phi)], axis=-1)
    return theta_hat, phi_hat

spherical_to_normal(theta, phi)

Transforms arrays of spherical coordinates to arrays of Cartesian unit normal vectors.

Parameters:

Name Type Description Default
theta NDArray[float64]

The polar angles in radians.

required
phi NDArray[float64]

The azimuthal angles in radians.

required

Returns:

Type Description
NDArray[float64]

The Cartesian unit normal vectors corresponding to the input spherical angles.

Source code in src/serval/utils.py
def spherical_to_normal(
    theta: npt.NDArray[np.float64], phi: npt.NDArray[np.float64]
) -> npt.NDArray[np.float64]:
    r"""Transforms arrays of spherical coordinates to arrays of Cartesian unit normal vectors.
    Parameters
    ----------
    theta : npt.NDArray[np.float64]
        The polar angles in radians.
    phi : npt.NDArray[np.float64]
        The azimuthal angles in radians.
    Returns
    -------
    npt.NDArray[np.float64]
        The Cartesian unit normal vectors corresponding to the input spherical angles.
    """
    st = np.sin(theta)
    return np.stack([np.cos(phi) * st, np.sin(phi) * st, np.cos(theta)], axis=-1)

visibilities_at_eras(mmodes, m_values, era_deg)

Evaluate a visibility timestream at arbitrary ERA positions via NUDFT.

Reconstructs visibilities at arbitrary Earth Rotation Angle values using the stored m-mode amplitudes. The evaluation is exact for band-limited signals (no aliasing up to the Nyquist m-value) and periodic with period 360°.

The relationship between m-modes and visibilities is::

vis(f, φ) = Σ_m  A_m(f) · exp(2πi · m · φ / 360)

where A_m = mmodes[..., mmax + m] (fftshifted storage).

Parameters:

Name Type Description Default
mmodes (ndarray, shape(..., n_era))

M-mode array in fftshifted order — m=0 is at index mmax = (n_era - 1) // 2. May be a zarr.Array; it will be loaded eagerly via np.asarray.

required
m_values ndarray of int, shape (n_m,)

M-values of the non-zero entries in mmodes.

required
era_deg (ndarray, shape(n_t))

ERA values in degrees at which to evaluate. Values outside [0, 360) are handled correctly via the periodicity of the complex exponential.

required

Returns:

Type Description
(ndarray, shape(..., n_t))

Reconstructed visibilities at the requested ERA positions.

Source code in src/serval/utils.py
def visibilities_at_eras(
    mmodes: npt.NDArray[np.complex128],
    m_values: npt.NDArray[np.int_],
    era_deg: npt.NDArray[np.float64],
) -> npt.NDArray[np.complex128]:
    r"""Evaluate a visibility timestream at arbitrary ERA positions via NUDFT.

    Reconstructs visibilities at arbitrary Earth Rotation Angle values using the
    stored m-mode amplitudes.  The evaluation is exact for band-limited signals
    (no aliasing up to the Nyquist m-value) and periodic with period 360°.

    The relationship between m-modes and visibilities is::

        vis(f, φ) = Σ_m  A_m(f) · exp(2πi · m · φ / 360)

    where ``A_m = mmodes[..., mmax + m]`` (fftshifted storage).

    Parameters
    ----------
    mmodes : ndarray, shape (..., n_era)
        M-mode array in fftshifted order — m=0 is at index ``mmax = (n_era - 1) // 2``.
        May be a ``zarr.Array``; it will be loaded eagerly via ``np.asarray``.
    m_values : ndarray of int, shape (n_m,)
        M-values of the non-zero entries in ``mmodes``.
    era_deg : ndarray, shape (n_t,)
        ERA values in degrees at which to evaluate. Values outside ``[0, 360)``
        are handled correctly via the periodicity of the complex exponential.

    Returns
    -------
    ndarray, shape (..., n_t)
        Reconstructed visibilities at the requested ERA positions.
    """
    mmax = (mmodes.shape[-1] - 1) // 2
    amplitudes = np.asarray(mmodes)[..., m_values + mmax]          # (..., n_m)
    phases = np.exp(2j * np.pi * m_values * era_deg[:, np.newaxis] / 360.0)  # (n_t, n_m)
    return amplitudes @ phases.swapaxes(-1, -2)  # (..., n_t)

visibilities_to_mmodes(vis)

Compute per-sample m-mode amplitudes from a visibility timestream.

Applies an FFT along the last axis and normalises by n_times so the returned coefficients are the Fourier-series amplitudes \(A_m\) satisfying

\[ \mathrm{vis}(\phi) = \sum_m A_m \exp\!\left(2 \pi i m \phi / 360\right). \]

The output is fftshifted: index n_times // 2 + m corresponds to m-mode m.

Parameters:

Name Type Description Default
vis NDArray[complex128]

Input visibilities with shape (..., n_times).

required

Returns:

Type Description
NDArray[complex128]

M-mode amplitudes with shape (..., n_times). The last axis is fftshifted so that m-values run from -n_times//2 to n_times//2 - 1 (even n_times) or (n_times-1)//2 (odd).

Source code in src/serval/utils.py
def visibilities_to_mmodes(
    vis: npt.NDArray[np.complex128],
) -> npt.NDArray[np.complex128]:
    r"""Compute per-sample m-mode amplitudes from a visibility timestream.

    Applies an FFT along the last axis and normalises by ``n_times`` so the
    returned coefficients are the Fourier-series amplitudes $A_m$ satisfying

    $$
    \mathrm{vis}(\phi) = \sum_m A_m \exp\!\left(2 \pi i m \phi / 360\right).
    $$

    The output is fftshifted: index ``n_times // 2 + m`` corresponds to
    m-mode ``m``.

    Parameters
    ----------
    vis : npt.NDArray[np.complex128]
        Input visibilities with shape ``(..., n_times)``.
    Returns
    -------
    npt.NDArray[np.complex128]
        M-mode amplitudes with shape ``(..., n_times)``.  The last axis is
        fftshifted so that m-values run from ``-n_times//2`` to
        ``n_times//2 - 1`` (even ``n_times``) or ``(n_times-1)//2`` (odd).
    """
    return np.fft.fftshift(np.fft.fft(vis, axis=-1), axes=-1) / vis.shape[-1]

Core Gaunt Coefficient Utilities (serval.gaunt.core.py)

gaunt_dot12(alm1, alm2, l3max, sum_m1=False, absm1_lower=None, absm1_upper=None)

Compute a projector for alm3 for the integral of the triple product of spherical harmonics by computing the Gaunt co-efficients in-place and sum-producting over l1, l2, m2 and m1 (if requested).

TODO add formula.

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

Spherical harmonic co-efficients in l1, m1.

required
alm2 NDArray[complex128]

Spherical harmonic co-efficients in l2, m2.

required
l3max int

Maximum l3 to compute co-efficients up to.

required
sum_m1 bool

If True, also sum over m1, otherwise return m1-modes for the m1 range specified, by default False

False
absm1_lower int | None

Lower limit in |m1| to use, by default None, ie. |m1| >= 0.

None
absm1_upper int | None

Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

None

Returns:

Type Description
NDArray[complex128]

m1-mode alm3 projector as a numpy array of shape (Nm1, l3max+1, 2l3max+1) if sum_m1 is False or (l3max+1, 2l3max+1) is sum_m1 is True.

Source code in src/serval/gaunt/core.py
def gaunt_dot12(
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    l3max: int,
    sum_m1: bool = False,
    absm1_lower: int | None = None,
    absm1_upper: int | None = None,
) -> npt.NDArray[np.complex128]:
    """Compute a projector for alm3 for the integral of the triple product
    of spherical harmonics by computing the Gaunt co-efficients in-place
    and sum-producting over l1, l2, m2 and m1 (if requested).

    TODO add formula.

    Parameters
    ----------
    alm1 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l1, m1.
    alm2 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l2, m2.
    l3max : int
        Maximum l3 to compute co-efficients up to.
    sum_m1 : bool, optional
        If True, also sum over m1, otherwise return m1-modes for the m1
        range specified, by default False
    absm1_lower : int | None, optional
        Lower limit in |m1| to use, by default None, ie. |m1| >= 0.
    absm1_upper : int | None, optional
        Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

    Returns
    -------
    npt.NDArray[np.complex128]
        m1-mode alm3 projector as a numpy array of shape (Nm1, l3max+1, 2*l3max+1)
        if sum_m1 is False or (l3max+1, 2*l3max+1) is sum_m1 is True.
    """
    if absm1_lower is None:
        absm1_lower = 0
    if absm1_upper is None:
        absm1_upper = alm1.shape[0]
    nm1 = len(_gaunt.m1_indexing(alm1.shape[0] - 1, absm1_lower, absm1_upper)[0])
    result = empty_complex_array((nm1, l3max + 1, 2 * l3max + 1))
    _gaunt.inplace_dot12(alm1, alm2, result, l3max, absm1_lower, absm1_upper)
    if sum_m1:
        return np.sum(result, axis=0)
    else:
        return result

gaunt_dot123(alm1, alm2, alm3, sum_m1=False, absm1_lower=None, absm1_upper=None)

Compute the integral of the triple product of spherical harmonics by computing the Gaunt co-efficients in-place and sum-producting over all harmonic degrees and orders except m1, unless requested.

TODO add formula.

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

Spherical harmonic co-efficients in l1, m1.

required
alm2 NDArray[complex128]

Spherical harmonic co-efficients in l2, m2.

required
alm3 NDArray[complex128]

Spherical harmonic co-efficients in l3, m3.

required
sum_m1 bool

If True, also sum over m1, otherwise return m1-modes for the m1 range specified, by default False

False
absm1_lower int | None

Lower limit in |m1| to use, by default None, ie. |m1| >= 0.

None
absm1_upper int | None

Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

None

Returns:

Type Description
NDArray[complex128] | float

Numpy array, shape (Nm1), of m1-modes or their sum if sum_m1 is True.

Source code in src/serval/gaunt/core.py
def gaunt_dot123(
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    alm3: npt.NDArray[np.complex128],
    sum_m1: bool = False,
    absm1_lower: int | None = None,
    absm1_upper: int | None = None,
) -> npt.NDArray[np.complex128] | float:
    """Compute the integral of the triple product of spherical harmonics
    by computing the Gaunt co-efficients in-place and sum-producting over all
    harmonic degrees and orders except m1, unless requested.

    TODO add formula.

    Parameters
    ----------
    alm1 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l1, m1.
    alm2 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l2, m2.
    alm3 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l3, m3.
    sum_m1 : bool, optional
        If True, also sum over m1, otherwise return m1-modes for the m1
        range specified, by default False
    absm1_lower : int | None, optional
        Lower limit in |m1| to use, by default None, ie. |m1| >= 0.
    absm1_upper : int | None, optional
        Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

    Returns
    -------
    npt.NDArray[np.complex128] | float
        Numpy array, shape (Nm1), of m1-modes or their sum if sum_m1 is True.
    """
    if absm1_lower is None:
        absm1_lower = 0
    if absm1_upper is None:
        absm1_upper = alm1.shape[0]
    nm1 = len(_gaunt.m1_indexing(alm1.shape[0] - 1, absm1_lower, absm1_upper)[0])
    result = empty_complex_array(nm1)
    _gaunt.inplace_dot123(alm1, alm2, alm3, result, absm1_lower, absm1_upper)
    if sum_m1:
        return result.sum()
    else:
        return result

gaunt_dot23(alm2, alm3, l1max, sum_m1=False, absm1_lower=None, absm1_upper=None)

Compute a projector for alm1 for the integral of the triple product of spherical harmonics by computing the Gaunt co-efficients in-place and sum-producting over l2, m2, l3, m3 and m1 (if requested).

TODO add formula.

Parameters:

Name Type Description Default
alm2 NDArray[complex128]

Spherical harmonic co-efficients in l2, m2.

required
alm3 NDArray[complex128]

Spherical harmonic co-efficients in l3, m3.

required
l1max int

Maximum l1 to compute co-efficients up to.

required
sum_m1 bool

If True, also sum over m1, otherwise return m1-modes for the m1 range specified, by default False

False
absm1_lower int | None

Lower limit in |m1| to use, by default None, ie. |m1| >= 0.

None
absm1_upper int | None

Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

None

Returns:

Type Description
NDArray[complex128]

m1-mode alm1 projector as a numpy array of shape (Nm1, l1max+1) if sum_m1 is False or (l1max+1,) is sum_m1 is True.

Source code in src/serval/gaunt/core.py
def gaunt_dot23(
    alm2: npt.NDArray[np.complex128],
    alm3: npt.NDArray[np.complex128],
    l1max: int,
    sum_m1: bool = False,
    absm1_lower: int | None = None,
    absm1_upper: int | None = None,
) -> npt.NDArray[np.complex128]:
    """Compute a projector for alm1 for the integral of the triple product
    of spherical harmonics by computing the Gaunt co-efficients in-place
    and sum-producting over l2, m2, l3, m3 and m1 (if requested).

    TODO add formula.

    Parameters
    ----------
    alm2 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l2, m2.
    alm3 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l3, m3.
    l1max : int
        Maximum l1 to compute co-efficients up to.
    sum_m1 : bool, optional
        If True, also sum over m1, otherwise return m1-modes for the m1
        range specified, by default False
    absm1_lower : int | None, optional
        Lower limit in |m1| to use, by default None, ie. |m1| >= 0.
    absm1_upper : int | None, optional
        Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

    Returns
    -------
    npt.NDArray[np.complex128]
        m1-mode alm1 projector as a numpy array of shape (Nm1, l1max+1)
        if sum_m1 is False or (l1max+1,) is sum_m1 is True.
    """
    if absm1_lower is None:
        absm1_lower = 0
    if absm1_upper is None:
        absm1_upper = l1max + 1
    nm1 = len(_gaunt.m1_indexing(l1max, absm1_lower, absm1_upper)[0])
    result = empty_complex_array((nm1, l1max + 1))
    _gaunt.inplace_dot23(alm2, alm3, result, l1max, absm1_lower, absm1_upper)
    if sum_m1:
        return np.sum(result, axis=0)
    else:
        return result

gaunts_coo(l1max, l2max, l3max)

Compute a sparse representation of all Gaunt co-efficients up to maximum harmonic degrees l1max, l2max and l3max.

Parameters:

Name Type Description Default
l1max int

Maximum harmonic degree l1.

required
l2max int

Maximum harmonic degree l2.

required
l3max int

Maximum harmonic degree l3.

required

Returns:

Type Description
COO

COO formatted sparse array of the Gaunt co-efficients.

Source code in src/serval/gaunt/core.py
def gaunts_coo(l1max: int, l2max: int, l3max: int) -> sparse.COO:
    """Compute a sparse representation of all Gaunt co-efficients up
    to maximum harmonic degrees l1max, l2max and l3max.

    Parameters
    ----------
    l1max : int
        Maximum harmonic degree l1.
    l2max : int
        Maximum harmonic degree l2.
    l3max : int
        Maximum harmonic degree l3.

    Returns
    -------
    sparse.COO
        COO formatted sparse array of the Gaunt co-efficients.
    """
    coords, data, shape = _gaunt.gaunts_coo(l1max, l2max, l3max)
    coords = np.array(coords)
    coords = coords.view(dtype=coords.dtype[0]).reshape(-1, 6).T
    data = np.array(data)
    return sparse.COO(
        coords=coords,
        data=data,
        shape=shape,
        has_duplicates=False,
        sorted=True,
        fill_value=0.0,
    )

gaunts_l1m1_coo(l1, absm1, l1max, l2max, l3max)

Compute a sparse representation of all Gaunt co-efficients with given harmonic degree l1 and harmonic order |m1|, up to maximum harmonic degrees l1max, l2max and l3max. The shape of the coordinates is the same as for the full gaunt coeffcients (gaunts_coo). The only difference is that the first two indices are l1 and m1, set to the input values.

Parameters:

Name Type Description Default
l1 int

Harmonic degree l1.

required
absm1 int

Harmonic order |m1|.

required
l1max int

Maximum harmonic degree l1.

required
l2max int

Maximum harmonic degree l2.

required
l3max int

Maximum harmonic degree l3.

required

Returns:

Type Description
COO

COO formatted sparse array of the Gaunt co-efficients.

Source code in src/serval/gaunt/core.py
def gaunts_l1m1_coo(
    l1: int, absm1: int, l1max: int, l2max: int, l3max: int
) -> sparse.COO:
    """Compute a sparse representation of all Gaunt co-efficients with given harmonic degree
    l1 and harmonic order |m1|, up to maximum harmonic degrees l1max, l2max and l3max. The shape
    of the coordinates is the same as for the full gaunt coeffcients (gaunts_coo).
    The only difference is that the first two indices are l1 and m1, set to the input values.

    Parameters
    ----------
    l1 : int
        Harmonic degree l1.
    absm1 : int
        Harmonic order |m1|.
    l1max : int
        Maximum harmonic degree l1.
    l2max : int
        Maximum harmonic degree l2.
    l3max : int
        Maximum harmonic degree l3.

    Returns
    -------
    sparse.COO
        COO formatted sparse array of the Gaunt co-efficients.
    """
    # Why is l1max needed...?
    coords, data, shape = _gaunt.gaunts_l1m1_coo(l1, absm1, l1max, l2max, l3max)
    coords = np.array(coords)
    coords = coords.view(dtype=coords.dtype[0]).reshape(-1, 6).T
    data = np.array(data)
    return sparse.COO(
        coords=coords,
        data=data,
        shape=shape,
        has_duplicates=False,
        sorted=True,
        fill_value=0.0,
    )

integrator12_contract3(int12, contract3)

(m1 l3 m3) (l3 m3' m3) -> (m1 l3 m3')

Source code in src/serval/gaunt/core.py
def integrator12_contract3(
    int12: npt.NDArray[np.complex128], contract3: npt.NDArray[np.complex128]
) -> npt.NDArray[np.complex128]:
    """(m1 l3 m3) (l3 m3' m3) -> (m1 l3 m3')"""
    # Check m3' axis smaller than m3 axis
    if int12.ndim == 3:  # Has no frequency axis
        result = empty_complex_array((int12.shape[0], contract3.shape[0], contract3.shape[1]))
        _contract.integrator12_contract3(int12, contract3, result)
        return result
    elif int12.ndim == 4:  # Has frequency axis
        result = empty_complex_array(
            (int12.shape[0] * int12.shape[1], contract3.shape[0], contract3.shape[1])
        )
        _contract.integrator12_contract3(
            int12.reshape((-1,) + int12.shape[2:]), contract3, result
        )
        return result.reshape(
            (int12.shape[0], int12.shape[1], contract3.shape[0], contract3.shape[1])
        )
    else:
        raise ValueError

single_gaunt(l1, l2, l3, m1, m2)

Computes a single Gaunt co-efficient for given harmonic degrees and orders. This uses Wigner-3j family algorithms so it is not efficient for computing many Gaunts co-efficients. Primarily for testing purposes m3 is determined by m1 + m2 + m3 = 0.

Parameters:

Name Type Description Default
l1 int

Harmonic degree l1.

required
l2 int

Harmonic degree l2.

required
l3 int

Harmonic degree l3.

required
m1 int

Harmonic order m1.

required
m2 int

Harmonic order m2.

required

Returns:

Type Description
float

Computed Gaunt co-efficient.

Source code in src/serval/gaunt/core.py
def single_gaunt(l1: int, l2: int, l3: int, m1: int, m2: int) -> float:
    """Computes a single Gaunt co-efficient for given harmonic degrees
    and orders. This uses Wigner-3j family algorithms so it is not
    efficient for computing many Gaunts co-efficients. Primarily for
    testing purposes m3 is determined by m1 + m2 + m3 = 0.

    Parameters
    ----------
    l1 : int
        Harmonic degree l1.
    l2 : int
        Harmonic degree l2.
    l3 : int
        Harmonic degree l3.
    m1 : int
        Harmonic order m1.
    m2 : int
        Harmonic order m2.

    Returns
    -------
    float
        Computed Gaunt co-efficient.
    """
    triangle = abs(l1 - l3) <= l2 <= abs(l1 + l3)
    parity = (l1 + l2 + l3) % 2 != 0
    if (
        not triangle
        or l1 < 0
        or l2 < 0
        or l3 < 0
        or abs(m1) > l1
        or abs(m2) > l2
        or abs(m1 + m2) > l3
        or parity      # parity condition: the sum of the harmonic degrees must be even
    ):
        return 0.0
    return _gaunt.single_gaunt(l1, l2, l3, m1, m2)

wigner_3jj(l2, l3, m2, m3)

Compute the family of non-zero Wigner-3j terms for harmonic degrees l2 and l3 for harmonic degrees m1, m2, m3 = -m1 -m2.

Parameters:

Name Type Description Default
l2 int

Harmonic degree l2.

required
l3 int

Harmonic degree l3.

required
m2 int

Harmonic order m2.

required
m3 int

Harmonic order m3.

required

Returns:

Type Description
tuple[int, NDArray[float64]]

Tuple of first non-zero harmonic degree l1min and array of Wigner-3j values of length l1max - l1min. If there are no non-zero elements of the Wigner-3j family, returns (-1, np.array([])).

Source code in src/serval/gaunt/core.py
def wigner_3jj(
    l2: int, l3: int, m2: int, m3: int
) -> tuple[int, npt.NDArray[np.float64]]:
    """Compute the family of non-zero Wigner-3j terms for harmonic degrees l2 and l3
    for harmonic degrees m1, m2, m3 = -m1 -m2.

    Parameters
    ----------
    l2 : int
        Harmonic degree l2.
    l3 : int
        Harmonic degree l3.
    m2 : int
        Harmonic order m2.
    m3 : int
        Harmonic order m3.

    Returns
    -------
    tuple[int, npt.NDArray[np.float64]]
        Tuple of first non-zero harmonic degree l1min and array of Wigner-3j values of
        length l1max - l1min. If there are no non-zero elements of the Wigner-3j family,
        returns (-1, np.array([])).
    """
    # In case there is an error in the result, pyshtools Wigner3j can also be used.
    # However, it carries extra zeros that need to be removed:
    # Wigner_3j = pysh.utils.Wigner3j(l2, l3, -m2 - m3, m2, m3)[0]
    # lmin = max(abs(l2 - l3), abs(-m2 -m3))
    # lmax = l2 + l3
    # lnum = lmax - lmin + 1
    # return lmin, Wigner_3j[: lnum + 1 :]
    if m2 == 0 and m3 == 0:
        return wigner_3jj_000(l2, l3)
    if l2 < 0 or l3 < 0 or abs(m2) > l2 or abs(m3) > l3:
        return (-1, np.array([]))
    result = _wigner.wigner_3jj_nochecks(l2, l3, m2, m3)
    return result[0], np.array(result[1])

wigner_3jj_000(l2, l3)

Compute the family of non-zero Wigner-3j terms for harmonic degrees l2 and l3 for harmonic degrees m1 = m2 = m3 = 0.

Parameters:

Name Type Description Default
l2 int

Harmonic degree l2.

required
l3 int

Harmonic degree l3.

required

Returns:

Type Description
tuple[int, NDArray[float64]]

Tuple of first non-zero harmonic order l1min and array of Wigner-3j values of length l1max - l1min. If there are no non-zero elements of the Wigner-3j family, returns (-1, np.array([])).

Source code in src/serval/gaunt/core.py
def wigner_3jj_000(l2: int, l3: int) -> tuple[int, npt.NDArray[np.float64]]:
    """Compute the family of non-zero Wigner-3j terms for harmonic degrees l2 and l3
    for harmonic degrees m1 = m2 = m3 = 0.

    Parameters
    ----------
    l2 : int
        Harmonic degree l2.
    l3 : int
        Harmonic degree l3.

    Returns
    -------
    tuple[int, npt.NDArray[np.float64]]
        Tuple of first non-zero harmonic order l1min and array of Wigner-3j values of
        length l1max - l1min. If there are no non-zero elements of the Wigner-3j family,
        returns (-1, np.array([])).
    """
    if l2 < 0 or l3 < 0:
        return (-1, np.array([]))
    result = _wigner.wigner_3jj_000_nochecks(l2, l3)
    return result[0], np.array(result[1])

wigner_3jm(l1, l2, l3, m1)

Compute the family of Wigner-3j terms for harmonic degrees l1, l2 and l3 all non-zero terms with harmonic order m1.

Parameters:

Name Type Description Default
l1 int

Harmonic degree l1.

required
l2 int

Harmonic degree l2.

required
l3 int

Harmonic degree l3.

required
m1 int

Harmonic order m1.

required

Returns:

Type Description
tuple[int, NDArray[float64]]

Tuple of first non-zero harmonic order m2min and array of Wigner-3j values of length l2max - l2min. If there are no non-zero elements of the Wigner-3j family, returns (-1, np.array([])).

Source code in src/serval/gaunt/core.py
def wigner_3jm(
    l1: int, l2: int, l3: int, m1: int
) -> tuple[int, npt.NDArray[np.float64]]:
    """Compute the family of Wigner-3j terms for harmonic degrees l1, l2 and l3
    all non-zero terms with harmonic order m1.

    Parameters
    ----------
    l1 : int
        Harmonic degree l1.
    l2 : int
        Harmonic degree l2.
    l3 : int
        Harmonic degree l3.
    m1 : int
        Harmonic order m1.

    Returns
    -------
    tuple[int, npt.NDArray[np.float64]]
        Tuple of first non-zero harmonic order m2min and array of Wigner-3j values of
        length l2max - l2min. If there are no non-zero elements of the Wigner-3j family,
        returns (-1, np.array([])).
    """
    triangle = abs(l1 - l3) <= l2 <= abs(l1 + l3)
    if (
        not triangle
        or l1 < 0
        or l2 < 0
        or l3 < 0
        or abs(m1) > l1
        or not (abs(l1 - l3) < l2 < l1 + l3)
    ):
        return (-1, np.array([]))
    result = _wigner.wigner_3jm_nochecks(l1, l2, l3, m1)
    return result[0], np.array(result[1])