Skip to content

Serval

Main Library

Core.py

PowerBeamVisProjector

Bases: VisProjector

Parameters:

Name Type Description Default
latitude float
required
longitude float
required
baseline_enu tuple[float, float, float]
required
power_beam_lmax int
required
frequencies_MHz ndarray
required
baseline_lmax int | None
None
sky_lmax int | None
None
sky_absm_limits tuple[int, Optional[int]]
(0, None)
generate_gaunt_cache_on_init bool
False
generate_baseline_cache_on_init bool
False
generate_pointing_contractor_on_init bool
True
batch_parallel_mode Literal['channel-opt', 'channel', 'gaunt']
'channel'
pointing_contract bool
False
pointing_altitude float | None
None
pointing_azimuth float | None
None
pointing_boresight float | None
None
pointed_beam_mmax int | None
None

Attributes:

Name Type Description
pointing_contractor ndarray | None
Source code in src/serval/core.py
@define(eq=False)
class PowerBeamVisProjector(VisProjector):
    latitude: float
    longitude: float
    baseline_enu: tuple[float, float, float]
    power_beam_lmax: int
    frequencies_MHz: np.ndarray

    baseline_lmax: int | None = None
    sky_lmax: int | None = None
    sky_absm_limits: tuple[int, int | None] = (0, None)
    generate_gaunt_cache_on_init: bool = False
    generate_baseline_cache_on_init: bool = False
    generate_pointing_contractor_on_init: bool = True
    batch_parallel_mode: Literal["channel-opt", "channel", "gaunt"] = "channel"
    pointing_contract: bool = False
    pointing_altitude: float | None = None
    pointing_azimuth: float | None = None
    pointing_boresight: float | None = None
    pointed_beam_mmax: int | None = None

    pointing_contractor: np.ndarray | None = field(init=False, default=None)

    _to_attrs: ClassVar[list[str]] = [
        "latitude",
        "longitude",
        "power_beam_lmax",
        "baseline_lmax",
        "sky_lmax",
        "baseline_enu",
        "pointing_contract",
        "pointing_altitude",
        "pointing_azimuth",
        "pointing_boresight",
        "pointed_beam_mmax",
    ]
    _to_zarr_store: ClassVar[list[str]] = ["frequencies_MHz"]
    _opt_tag: ClassVar[str] = "opt12"

    def __attrs_post_init__(self):
        if self.generate_baseline_cache_on_init:
            self.generate_baseline_cache()
        if self.sky_lmax is None:
            if self.baseline_lmax is not None:
                if self.sky_lmax != self.baseline_lmax + self.power_beam_lmax:
                    warnings.warn(
                        "The value of sky_lmax is changed to the sum of baseline_lmax and "
                        "power_beam_lmax.",
                        stacklevel=2,  # points to caller
                    )
                    self.sky_lmax = self.power_beam_lmax + self.baseline_lmax
            else:
                raise ValueError(
                    "Cannot determine sky_lmax with inputs given.\n"
                    "Either baseline_cache needs to be computed on input, "
                    "baseline_lmax must be specified or sky_lmax must be specified."
                )
        if self.generate_gaunt_cache_on_init:
            self.generate_gaunt_cache()
        else:
            self.update_integrator()
        if self.pointing_contract and self.generate_pointing_contractor_on_init:
            if self.pointed_beam_mmax is None:
                raise ValueError(
                    "Cannot determine pointing_contractor with the inputs given, "
                    "because pointed_beam_mmax is None."
                )
            if self.pointed_beam_mmax > self.power_beam_lmax:
                raise ValueError(
                    "pointed_beam_mmax cannot be larger than power_beam_lmax."
                )
            self.generate_pointing_contractor()

    def generate_pointing_contractor(self):
        self.pointing_contractor = wigner_d(
            self.power_beam_lmax,
            self.pointed_beam_mmax,
            tirs_to_pointing(
                latitude=self.latitude,
                longitude=self.longitude,
                altitude=self.pointing_altitude,
                azimuth=self.pointing_azimuth,
                boresight=self.pointing_boresight,
            ),
        )

    def generate_baseline_cache(self):
        self.baseline_cache = tirs_baseline_alm(
            enu=self.baseline_enu,
            frequencies_MHz=self.frequencies_MHz,
            latitude=self.latitude,
            longitude=self.longitude,
            lmax=self.baseline_lmax,
        )
        # Can assert this is the same as input if not none.
        self.baseline_lmax = self.baseline_cache.shape[1] - 1
        if self.sky_lmax != self.baseline_lmax + self.power_beam_lmax:
            if self.sky_lmax is not None:
                warnings.warn(
                    "The value of sky_lmax is changed to the sum of baseline_lmax and "
                    "power_beam_lmax.",
                    stacklevel=2,  # points to caller
                )
            self.sky_lmax = self.baseline_lmax + self.power_beam_lmax

    def beam_linear_mmode_generator(self, sky_alms, release_gaunt_cache: bool = True):
        sky_alms = extend_dimensions_if_one_batch(sky_alms, 3)
        if sky_alms.shape[0] != len(self.frequencies_MHz):
            raise ValueError(
                "The sky_alms coefficients do not correspond to the number of frequency bins."
            )
        if sky_alms.shape[1] != self.sky_lmax + 1:
            raise ValueError(
                "The sky_alms coefficients do not correspond to the expected sky_lmax "
                "(baseline_lmax + power_beam_lmax)."
            )
        if sky_alms.shape[2] != 2 * self.sky_lmax + 1:
            raise ValueError(
                "The sky_alms coefficients do not correspond to the full harmonic orders "
                "of sky_lmax (baseline_lmax + power_beam_lmax) -> (ms = 2 * sky_lmax + 1)."
            )
        if self.baseline_cache is not None:
            self.generate_baseline_cache()
        if self.baseline_cache is None:
            raise ValueError("baseline_cache is None. It must be generated beforehand.")
        if self.pointing_contract and self.pointing_contractor is None:
            self.generate_pointing_contractor()
        self.triple_sh_integrator.generate_integrator_cache_12(
            sky_alms,
            self.baseline_cache,
            contract3=self.pointing_contractor,
            release_gaunt_cache=release_gaunt_cache,
            batch_parallel_mode=self.batch_parallel_mode,
        )
        self.integrator_cache = self.triple_sh_integrator.linear_integrator_cache_12
        return self.integrator_cache

    def mmodes(self, beam_alms, sky_alms=None):
        beam_alms = extend_dimensions_if_one_batch(beam_alms, 3)
        if beam_alms.shape[0] != len(self.frequencies_MHz):
            raise ValueError(
                "The beam_alms coefficients do not correspond to the number of frequency bins."
            )
        if (
            beam_alms.shape[1] != self.power_beam_lmax + 1
        ):
            raise ValueError(
                "The beam_alms coefficients do not correspond to the inputted power_beam_lmax."
            )
        if self.pointing_contract:
            if beam_alms.shape[2] != 2 * self.pointed_beam_mmax + 1:
                raise ValueError(
                    "The beam_alms coefficients do not correspond to the expected "
                    "pointed_beam_mmax."
                )
        else:
            if beam_alms.shape[2] != 2 * self.power_beam_lmax + 1:
                raise ValueError(
                    "The beam_alms coefficients do not correspond to the full harmonic orders "
                    "of power_beam_lmax -> (ms = 2 * power_beam_lmax + 1)."
                )
        if sky_alms is None and self.integrator_cache is None:
            raise ValueError("Must specify either sky_alms or integrator cache.")
        else:
            if sky_alms is None:
                integrator_cache = self.integrator_cache
            else:
                integrator_cache = self.beam_linear_mmode_generator(sky_alms=sky_alms)
            integrator_cache = extend_dimensions_if_one_batch(integrator_cache, 4)
            return self.triple_sh_integrator.batch_gaunt_integrate_cached_12(
                beam_alms, integrator_cache=integrator_cache
            )

    def visibilities(
        self,
        beam_alms,
        sky_alms=None,
    ):
        mmodes = self.mmodes(beam_alms=beam_alms, sky_alms=sky_alms)
        return mmodes_to_visibilities(mmodes, self.sky_lmax, ms=self.sky_m_values)

    def setup_zarr_store(
        self,
        store_location: str | Path,
        frequency_chunksize: int | None = None,
        group_path: str = r"/",
        store_metadata_attrs: dict | None = None,
    ):
        if self.pointing_contract:
            # if self.pointed_beam_mmax is None:
            #     raise ValueError("pointed_beam_mmax is None.")
            num_beam_ms = 2 * self.pointed_beam_mmax + 1
        else:
            num_beam_ms = 2 * self.power_beam_lmax + 1
        integrator_cache_shape = (
            self.frequencies_MHz.size,
            2 * self.sky_lmax + 1,
            self.power_beam_lmax + 1,
            num_beam_ms,
        )
        integrator_cache_chunks = (
            (
                frequency_chunksize
                if frequency_chunksize is not None
                else self.frequencies_MHz.size
            ),
            1,  # per m-chunking for parallel writes
            self.power_beam_lmax + 1,
            num_beam_ms,
        )
        self._setup_zarr_store(
            store_location=store_location,
            group_path=group_path,
            integrator_cache_shape=integrator_cache_shape,
            integrator_cache_chunks=integrator_cache_chunks,
            store_metadata_attrs=store_metadata_attrs,
        )

SingleVisibilityGenerator

Parameters:

Name Type Description Default
latitude float
required
longitude float
required
baseline_enu tuple[float, float, float]
required
frequency_MHz float
required
power_beam_alm ndarray[tuple[Any, ...], dtype[complex128]]
required
sky_alm ndarray[tuple[Any, ...], dtype[complex128]]
required
method Literal['gaunt', 'gaunt_sparse', 'grid']
'gaunt'
baseline_lmax int | None
None
sky_absm_limits tuple[int, Optional[int]]
(0, None)
baseline_alm None | ndarray[tuple[Any, ...], dtype[complex128]]
None

Attributes:

Name Type Description
sky_lmax int
power_beam_lmax int
triple_sh_integrator TripleSHIntegrator
Source code in src/serval/core.py
@define(eq=False)
class SingleVisibilityGenerator:
    latitude: float
    longitude: float
    baseline_enu: tuple[float, float, float]
    frequency_MHz: float
    power_beam_alm: npt.NDArray[np.complex128]
    sky_alm: npt.NDArray[np.complex128]

    method: Literal["gaunt", "gaunt_sparse", "grid"] = "gaunt"
    baseline_lmax: int | None = None

    sky_absm_limits: tuple[int, int | None] = (0, None)

    baseline_alm: None | npt.NDArray[np.complex128] = None

    sky_lmax: int = field(init=False)
    power_beam_lmax: int = field(init=False)
    triple_sh_integrator: TripleSHIntegrator = field(init=False)

    def generate_baseline_cache(self):
        self.baseline_alm = tirs_baseline_alm(
            enu=self.baseline_enu,
            frequencies_MHz=np.atleast_1d(self.frequency_MHz)[None, :],
            latitude=self.latitude,
            longitude=self.longitude,
            lmax=self.baseline_lmax,
        )[
            0, 0, ...
        ]  # TODO Figure out broadcasting here... The shape of the matrix:
           # [None from np.atleast_1s, len(frequencies_MHz)=1, lmax + 1, 2 * lmax + 1]
        self.baseline_lmax = self.baseline_alm.shape[0] - 1

    def __attrs_post_init__(self):
        if not isinstance(self.frequency_MHz, float | int):
            raise TypeError("frequency_MHz must be a float or integer")
        if self.baseline_alm is None:
            self.generate_baseline_cache()
        elif self.baseline_lmax is None:
            self.baseline_lmax=self.baseline_alm.shape[0] - 1
        else:
            if self.baseline_lmax != self.baseline_alm.shape[0] - 1:
                raise ValueError(
                    "The defined harmonic degree baseline_lmax does not "
                    "correspond to the defined baseline_alm coefficients"
                )
        self.sky_lmax = self.sky_alm.shape[0] - 1
        self.power_beam_lmax = self.power_beam_alm.shape[0] - 1
        self.triple_sh_integrator = TripleSHIntegrator(
            l1max=self.sky_lmax,
            l2max=self.baseline_lmax,
            l3max=self.power_beam_lmax,
            absm1_limits=self.sky_absm_limits,
            generate_cache_on_init=False,
        )

    @property
    def sky_m_values(self):
        return self.triple_sh_integrator.m1_values

    @property
    def sky_m_index_map(self):
        return self.triple_sh_integrator.m1_index_map

    @property
    def num_sky_ms(self):
        return len(self.sky_m_values)

    def mmodes(
        self,
    ):
        if self.method == "grid":
            return self.triple_sh_integrator.grid_integrate(
                alm1=self.sky_alm, alm2=self.baseline_alm, alm3=self.power_beam_alm
            )
        elif self.method == "gaunt_sparse":
            return self.triple_sh_integrator.sparse_gaunt_einsum(
                sparse_alm1=sparse.COO.from_numpy(self.sky_alm),
                sparse_alm2=sparse.COO.from_numpy(self.baseline_alm),
                sparse_alm3=sparse.COO.from_numpy(self.power_beam_alm),
            )
        elif self.method == "gaunt":
            return self.triple_sh_integrator.gaunt_integrate(
                alm1=self.sky_alm, alm2=self.baseline_alm, alm3=self.power_beam_alm
            )

    def visibilities(self):
        mmodes = self.mmodes()
        return mmodes_to_visibilities(mmodes, m1max=self.sky_lmax, ms=self.sky_m_values)

SkyVisProjector

Bases: VisProjector

Parameters:

Name Type Description Default
latitude float
required
longitude float
required
baseline_enu tuple[float, float, float]
required
sky_lmax int
required
frequencies_MHz ndarray
required
baseline_lmax int | None
None
power_beam_lmax int | None
None
sky_absm_limits tuple[int, Optional[int]]
(0, None)
generate_gaunt_cache_on_init bool
False
generate_baseline_cache_on_init bool
False
batch_parallel_mode Literal['channel', 'gaunt']
'channel'
Source code in src/serval/core.py
@define(eq=False)
class SkyVisProjector(VisProjector):
    latitude: float
    longitude: float
    baseline_enu: tuple[float, float, float]
    sky_lmax: int
    frequencies_MHz: np.ndarray

    baseline_lmax: int | None = None
    power_beam_lmax: int | None = None
    sky_absm_limits: tuple[int, int | None] = (0, None)
    generate_gaunt_cache_on_init: bool = False
    generate_baseline_cache_on_init: bool = False
    batch_parallel_mode: Literal["channel", "gaunt"] = "channel"

    _to_attrs = [
        "latitude",
        "longitude",
        "sky_lmax",
        "power_beam_lmax",
        "baseline_lmax",
        "baseline_enu",
    ]
    _to_zarr_store = ["frequencies_MHz"]

    def __attrs_post_init__(self):
        if self.generate_baseline_cache_on_init:
            self.generate_baseline_cache()
        if self.power_beam_lmax is None:
            if self.baseline_lmax is not None:
                if self.power_beam_lmax != self.sky_lmax - self.baseline_lmax:
                    warnings.warn(
                        "The value of power_beam_lmax is changed to the difference "
                        "of baseline_lmax from sky_lmax.",
                        stacklevel=2,  # points to caller
                    )
                    self.power_beam_lmax = self.sky_lmax - self.baseline_lmax
            else:
                raise ValueError(
                    "Cannot determine power_beam_lmax with inputs given.\n"
                    "Either baseline_cache needs to be computed on input, baseline_lmax must be "
                    "specified or power_beam_lmax must be specified."
                )
        self.triple_sh_integrator = TripleSHIntegrator(
            l1max=self.sky_lmax,
            l2max=self.baseline_lmax,
            l3max=self.power_beam_lmax,
            absm1_limits=self.sky_absm_limits,
            generate_cache_on_init=False,
        )
        if self.generate_gaunt_cache_on_init:
            self.generate_gaunt_cache()

    def generate_baseline_cache(self):
        self.baseline_cache = tirs_baseline_alm(
            enu=self.baseline_enu,
            frequencies_MHz=self.frequencies_MHz,
            latitude=self.latitude,
            longitude=self.longitude,
            lmax=self.baseline_lmax,
        )
        # Can assert this is the same as input if not none.
        self.baseline_lmax = self.baseline_cache.shape[1] - 1
        # Only changed here:
        if self.power_beam_lmax != self.sky_lmax - self.baseline_lmax:
            if self.power_beam_lmax is not None:
                warnings.warn(
                    "The value of power_beam_lmax is changed to the difference "
                    "of baseline_lmax from sky_lmax.",
                    stacklevel=2,  # points to caller
                )
            self.power_beam_lmax = self.sky_lmax - self.baseline_lmax

    def sky_linear_mmode_generator(self, beam_alms):
        beam_alms = extend_dimensions_if_one_batch(beam_alms, 3)
        if beam_alms.shape[0] != len(self.frequencies_MHz):
            raise ValueError(
                "The beam_alms coefficients do not correspond to the number of frequency bins."
            )
        if (
            beam_alms.shape[1] != self.power_beam_lmax + 1
            or beam_alms.shape[2] != 2 * self.power_beam_lmax + 1
        ):
            raise ValueError(
                "The beam_alms coefficients do not correspond to the expected power_beam_lmax "
                "(sky_lmax - baseline_lmax). Either number ls or ms is incorrect."
            )
        self.triple_sh_integrator.generate_integrator_cache_23(
            alm2=self.baseline_cache,
            alm3=beam_alms,
            batch_parallel_mode=self.batch_parallel_mode,
        )
        self.integrator_cache = self.triple_sh_integrator.linear_integrator_cache_23
        return self.integrator_cache

    def mmodes(self, sky_alms, beam_alms=None):
        sky_alms = extend_dimensions_if_one_batch(sky_alms, 3)
        if sky_alms.shape[0] != len(self.frequencies_MHz):
            raise ValueError(
                "The sky_alms coefficients do not correspond to the number of frequency bins."
            )
        if sky_alms.shape[1] != self.sky_lmax + 1:
            raise ValueError(
                "The sky_alms coefficients do not correspond to the inputted sky_lmax."
            )
        if sky_alms.shape[2] != 2 * self.sky_lmax + 1:
            raise ValueError(
                "The sky_alms coefficients do not correspond to the expected harmonic orders "
                "for the inputted sky_lmax (ms = 2 * sky_lmax + 1)."
            )
        cut_sky_alms = sky_alms[
            ..., list(self.sky_m_index_map.keys())
        ]  # Need to select only the valid ms

        if beam_alms is None and self.integrator_cache is None:
            raise ValueError("Must specify either beam_alms or integrator cache.")
        else:
            if beam_alms is None:
                integrator_cache = self.integrator_cache
            else:
                integrator_cache = self.sky_linear_mmode_generator(beam_alms=beam_alms)
            integrator_cache = extend_dimensions_if_one_batch(integrator_cache, 3)
            return self.triple_sh_integrator.batch_gaunt_integrate_cached_23(
                alm1=cut_sky_alms, integrator_cache=integrator_cache
            )

    def visibilities(
        self,
        sky_alms,
        beam_alms=None,
    ):
        mmodes = self.mmodes(beam_alms=beam_alms, sky_alms=sky_alms)
        return mmodes_to_visibilities(mmodes, self.sky_lmax, ms=self.sky_m_values)

    def setup_zarr_store(
        self,
        store_location: str | Path,
        frequency_chunksize: int | None = None,
        group_path: str = r"/",
        store_metadata_attrs: dict | None = None,
    ):
        integrator_cache_shape = (
            self.frequencies_MHz.size,
            2 * self.sky_lmax + 1,
            self.sky_lmax + 1,
        )
        integrator_cache_chunks = (
            (
                frequency_chunksize
                if frequency_chunksize is not None
                else self.frequencies_MHz.size
            ),
            1,  # per m-chunking for parallel writes
            self.power_beam_lmax + 1,
        )
        self._setup_zarr_store(
            store_location=store_location,
            group_path=group_path,
            integrator_cache_shape=integrator_cache_shape,
            integrator_cache_chunks=integrator_cache_chunks,
            store_metadata_attrs=store_metadata_attrs,
        )

TripleSHIntegrator

A class for performing integrals of the products of three functions on the sphere expressed as their spherical harmonic transformations.

This class provides methods for this integration performed with a variety of techniques using Gaunt coefficients and grid-based approaches. It supports caching for efficient repeated computations when changing only one of the functions. Additionally it allows for computing the Fourier transform of these integrals in an azimuthal rotation angle of one of the functions (always the 1st) around the others, ie. the \(m\)-modes.

That is,

\[ \begin{align} \mathcal{V} &= \int\diff\Omega~f_1(\dirvec)f_2(\dirvec)f_3(\dirvec) \\ &= \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_1 m_2 m_3}} a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3} \int\diff\Omega~Y_{\ell_1 m_1}(\dirvec) Y_{\ell_2 m_2}(\dirvec) Y_{\ell_3 m_3}(\dirvec), \end{align} \]

where

\[ f_1(\dirvec) = \sum_{\ell_1=0}^{l_{1,\mathrm{max}}} \sum_{m_1=-\ell_1}^{\ell_1} a_{\ell_1 m_1} Y_{\ell_1 m_1}(\dirvec) \]

etc.

For the \(m\)-modes:

\[ \begin{align} \mathcal{V}_{m_1} &= \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_2 m_3}} a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3} \int\diff\Omega~Y_{\ell_1 m_1}(\dirvec) Y_{\ell_2 m_2}(\dirvec) Y_{\ell_3 m_3}(\dirvec), \\ &= \int \frac{\diff\phi}{2\pi} \exp[-im_1\phi] \int\diff\Omega~f_1( \matr{\mathcal{R}}_Z(-\phi)\dirvec)f_2(\dirvec)f_3(\dirvec) \end{align} \]

where \(\matr{\mathcal{R}}_Z(-\phi)\) is a passive basis rotation about the polar axis of the sphere.

Notes

The index of the functions only matters in differentiating from the others the first function/set of coefficients as the integration can be split up in \(m_1\) values. Additionally integrals can be performed rotating this function azimuthally around the others. That is, for the purposes of this code, this represents the sky.

Parameters:

Name Type Description Default
l1max int

Maximum spherical harmonic degree for the first set of coefficients.

required
l2max int

Maximum spherical harmonic degree for the second set of coefficients.

required
l3max int

Maximum spherical harmonic degree for the third set of coefficients.

required
absm1_limits tuple[int, int | None]

Inclusive lower and exclusive upper limits for the absolute value of the m1 index. Defaults to (0, None), ie, all m1 values.

(0, None)
generate_cache_on_init bool

Whether to generate the Gaunt coefficient cache during initialization, by default False.

False

Attributes:

Name Type Description
m1_values list[int]

List of m1 values that integrations are performed for.

m1_index_map dict[int, int]

Mapping of m1 values to their indices in a completed set of m1 values.

linear_integrator_cache_12 ndarray[tuple[Any, ...], dtype[complex128]] | None
linear_integrator_cache_23 ndarray[tuple[Any, ...], dtype[complex128]] | None
gaunt_cache CachingGauntContractor | CachingGauntContractorOpt12 | None
m1_global_index list[int]

Methods:

Name Description
generate_gaunt_cache

Generates the Gaunt coefficient cache.

clear_gaunt_cache

Clears the Gaunt coefficient cache.

sparse_gaunt_einsum

Performs sparse tensor contraction using computed Gaunt coefficients.

grid_integrate

Performs integration using a grid-based approach.

gaunt_integrate

Performs integration using a direct sum over precomputed Gaunt coefficients.

linear_gaunt_integrator_12

Computes a linear integrator for fixed first and second sets of coefficients.

generate_integrator_cache_12

batch_parallel_mode="channel") Generates and caches the linear integrator for fixed first and second sets of coefficients.

batch_gaunt_integrate_cached_12

Uses a integrator cache for fixed first and second sets of coefficients to compute integrals over a batch of the third set of coefficients.

linear_gaunt_integrator_23

Computes a linear integrator for fixed second and third sets of coefficients.

generate_integrator_cache_23

Generates and caches the linear integrator for fixed second and third sets of coefficients.

batch_gaunt_integrate_cached_12

Uses a integrator cache for fixed second and third sets of coefficients to compute integrals over a bacth of the first set of coefficients.

Source code in src/serval/core.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
@define(eq=False)
class TripleSHIntegrator:
    r"""
    A class for performing integrals of the products of three functions on the
    sphere expressed as their spherical harmonic transformations.

    This class provides methods for this integration performed with a variety
    of techniques using Gaunt coefficients and grid-based approaches. It
    supports caching for efficient repeated computations when changing only
    one of the functions. Additionally it allows for computing the Fourier
    transform of these integrals in an azimuthal rotation angle of one of
    the functions (always the 1st) around the others, ie. the $m$-modes.

    That is,

    $$
    \begin{align}
    \mathcal{V} &= \int\diff\Omega~f_1(\dirvec)f_2(\dirvec)f_3(\dirvec) \\
                &= \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_1 m_2 m_3}}
                  a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3}
                  \int\diff\Omega~Y_{\ell_1 m_1}(\dirvec)
                  Y_{\ell_2 m_2}(\dirvec) Y_{\ell_3 m_3}(\dirvec),
    \end{align}
    $$

    where

    $$
    f_1(\dirvec) = \sum_{\ell_1=0}^{l_{1,\mathrm{max}}}
                   \sum_{m_1=-\ell_1}^{\ell_1} a_{\ell_1 m_1} Y_{\ell_1 m_1}(\dirvec)
    $$

    etc.

    For the $m$-modes:

    $$
    \begin{align}
    \mathcal{V}_{m_1} &= \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_2 m_3}}
                        a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3}
                        \int\diff\Omega~Y_{\ell_1 m_1}(\dirvec)
                        Y_{\ell_2 m_2}(\dirvec) Y_{\ell_3 m_3}(\dirvec), \\
                      &=
                        \int \frac{\diff\phi}{2\pi} \exp[-im_1\phi] \int\diff\Omega~f_1(
                        \matr{\mathcal{R}}_Z(-\phi)\dirvec)f_2(\dirvec)f_3(\dirvec)
    \end{align}
    $$

    where $\matr{\mathcal{R}}_Z(-\phi)$ is a passive basis rotation about the polar axis
    of the sphere.

    Notes
    -----
    The index of the functions only matters in differentiating from the others the first
    function/set of coefficients as the integration can be split up in $m_1$ values.
    Additionally integrals can be performed rotating this function azimuthally around
    the others. That is, for the purposes of this code, this represents the sky.


    Parameters
    ----------
    l1max : int
        Maximum spherical harmonic degree for the first set of coefficients.
    l2max : int
        Maximum spherical harmonic degree for the second set of coefficients.
    l3max : int
        Maximum spherical harmonic degree for the third set of coefficients.
    absm1_limits : tuple[int, int | None], optional
        Inclusive lower and exclusive upper limits for the absolute value of
        the m1 index. Defaults to (0, None), ie, all m1 values.
    generate_cache_on_init : bool, optional
        Whether to generate the Gaunt coefficient cache during initialization,
        by default False.

    Attributes
    ----------
    l1max : int
        Maximum spherical harmonic degree for the first set of coefficients.
    l2max : int
        Maximum spherical harmonic degree for the second set of coefficients.
    l3max : int
        Maximum spherical harmonic degree for the third set of coefficients.
    absm1_limits : tuple[int, int | None]
        Inclusive lower and exclusive upper limits for the absolute value of
        the m1 index.
    generate_cache_on_init : bool
        Whether to generate the Gaunt coefficient cache during initialization.
    m1_values : list[int]
        List of m1 values that integrations are performed for.
    m1_index_map : dict[int, int]
        Mapping of m1 values to their indices in a completed set of m1 values.

    Methods
    -------
    generate_gaunt_cache(cache_type=CachingGauntContractor)
        Generates the Gaunt coefficient cache.
    clear_gaunt_cache()
        Clears the Gaunt coefficient cache.
    sparse_gaunt_einsum(sparse_alm1, sparse_alm2, sparse_alm3, sum_m1=False, gaunts=None)
        Performs sparse tensor contraction using computed Gaunt coefficients.
    grid_integrate(alm1, alm2, alm3, sum_m1=False)
        Performs integration using a grid-based approach.
    gaunt_integrate(alm1, alm2, alm3, sum_m1=False)
        Performs integration using a direct sum over precomputed Gaunt coefficients.
    linear_gaunt_integrator_12(alm1, alm2, contract3=None, sum_m1=False)
        Computes a linear integrator for fixed first and second sets of coefficients.
    generate_integrator_cache_12(alm1, alm2, contract3=None, release_gaunt_cache=True,
                                 batch_parallel_mode="channel")
        Generates and caches the linear integrator for fixed first and second sets of
        coefficients.
    batch_gaunt_integrate_cached_12(alm3, integrator_cache=None, sum_m1=False)
        Uses a integrator cache for fixed first and second sets of coefficients to
        compute integrals over a batch of the third set of coefficients.
    linear_gaunt_integrator_23(alm2, alm3, sum_m1=False)
        Computes a linear integrator for fixed second and third sets of coefficients.
    generate_integrator_cache_23(alm2, alm3)
        Generates and caches the linear integrator for fixed second and third sets of
        coefficients.
    batch_gaunt_integrate_cached_12(alm1, integrator_cache=None, sum_m1=False)
        Uses a integrator cache for fixed second and third sets of coefficients to
        compute integrals over a bacth of the first set of coefficients.
    """

    l1max: int
    l2max: int
    l3max: int

    absm1_limits: tuple[int, int | None] = (0, None)
    generate_cache_on_init: bool = False

    linear_integrator_cache_12: npt.NDArray[np.complex128] | None = field(
        init=False, default=None
    )
    linear_integrator_cache_23: npt.NDArray[np.complex128] | None = field(
        init=False, default=None
    )
    gaunt_cache: CachingGauntContractor | CachingGauntContractorOpt12 | None = field(
        init=False, default=None
    )
    m1_values: list[int] = field(init=False)
    m1_index_map: dict[int, int] = field(init=False)
    m1_global_index: list[int] = field(init=False)

    _cache_types: ClassVar[
        dict[str, CachingGauntContractor | CachingGauntContractorOpt12]
    ] = {
        "opt12": CachingGauntContractorOpt12,
        "generic": CachingGauntContractor,
    }

    def __attrs_post_init__(self):
        if self.absm1_limits[1] is None:
            self.absm1_limits = (self.absm1_limits[0], self.l1max + 1)
        if self.generate_cache_on_init:
            self.generate_gaunt_cache()
        self.m1_values, self.m1_index_map = m1_indexing(
            self.l1max, self.absm1_limits[0], self.absm1_limits[1]
        )
        self.m1_global_index = list(self.m1_index_map.keys())

    def generate_gaunt_cache(
        self, cache_type_tag: Literal["opt12", "generic"] = "generic"
    ):
        """Generates the Gaunt coefficient cache."""
        self.gaunt_cache = self._cache_types[cache_type_tag](
            self.l1max,
            self.l2max,
            self.l3max,
            absm1_lower=self.absm1_limits[0],
            absm1_upper=self.absm1_limits[1],
        )

    def clear_gaunt_cache(self):
        """Clears the Gaunt coefficient cache."""
        self.gaunt_cache = None

    def sparse_gaunt_einsum(
        self,
        sparse_alm1: sparse.SparseArray,
        sparse_alm2: sparse.SparseArray,
        sparse_alm3: sparse.SparseArray,
        sum_m1: bool = False,
        gaunts: sparse.COO | None = None,
    ) -> sparse.COO:
        r"""
        Perform the triple integral using a sparse tensor contraction.

        Compute the triple integral by generating or using a sparse
        representation of the Gaunt coefficients and performing the sum
        with a sparse einsum. This is mostly a cross-checking function
        and is not performant.

        Parameters
        ----------
        sparse_alm1 : sparse.SparseArray
            Sparse represenation of the first set of coefficients with shape (l1max+1, Nm1).
        sparse_alm2 : sparse.SparseArray
            Sparse represenation of the second set of coefficients with shape (l2max+1, 2*l2max+1).
        sparse_alm3 : sparse.SparseArray
            Sparse represenation of the third set of coefficients (l3max+1, 2*l3max+1).
        sum_m1 : bool, optional
            Whether to sum over the m1's before output, by default False.
        gaunts : sparse.COO | None, optional
            Sparse representation of the Gaunt coefficients if already.
            computed, by default None.

        Returns
        -------
        npt.NDArray[np.complex128] | np.complex128
            If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
            sum over all m-modes.
        """
        if gaunts is None:
            gaunts = gaunts_coo(self.l1max, self.l2max, self.l3max)
        if sum_m1:
            return sparse.einsum(
                "ijklmn, ij, kl, mn -> ", gaunts, sparse_alm1, sparse_alm2, sparse_alm3
            )
        else:
            return sparse.einsum(
                    "ijklmn, ij, kl, mn -> j",
                    gaunts,
                    sparse_alm1,
                    sparse_alm2,
                    sparse_alm3,
                ).todense()

    def grid_integrate(
        self,
        alm1: npt.NDArray[np.complex128],
        alm2: npt.NDArray[np.complex128],
        alm3: npt.NDArray[np.complex128],
        sum_m1: bool = False,
    ) -> npt.NDArray[np.complex128]:
        r"""Perform the triple integral using a grid-based approach.

        Here the product of the second and third fields are evaluated on a
        consistent resolution grid, decomposed to conjugate spherical
        harmonics and multipled by the first set of coefficients. This more
        standard m-mode approach is performant for single integrals
        but, as implemented here, can't cache intermediate results and
        therefore not suitable for batching.


        Parameters
        ---------
        alm1 : npt.NDArray[np.complex128]
            The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
        alm2 : npt.NDArray[np.complex128]
            The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
        alm3 : npt.NDArray[np.complex128]
            The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).
        sum_m1 : bool, optional
            Whether to sum over the m1's before output, by default False.

        Returns
        -------
        npt.NDArray[np.complex128] | np.complex128
            If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
            sum over all m-modes.
        """
        grid_lmax = int(np.ceil(self.l1max + self.l2max + self.l3max))
        alm1, alm2, alm3 = broadcast_bandlimits(alm1, alm2, alm3, lmax=grid_lmax)
        static_grid = grid_template(grid_lmax)
        static_grid.data[:] = 1.0
        static_grid *= array_synthesis(alm2) * array_synthesis(alm3)
        static_alm = analysis(static_grid, conjugate=True, return_sparse=False)
        out_mmodes = compute_mmodes(static_alm, alm1)
        return out_mmodes[np.asarray(self.m1_values) + grid_lmax]

    def gaunt_integrate(
        self,
        alm1: npt.NDArray[np.complex128],
        alm2: npt.NDArray[np.complex128],
        alm3: npt.NDArray[np.complex128],
        sum_m1: bool = False,
    ) -> npt.NDArray[np.complex128] | float:
        r"""Perform the triple integral by doing and inplace
        sum-product over gaunt coefficients.

        $$
        \mathcal{V} = \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_1 m_2 m_3}}
                    \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~
                    a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3}.
        $$

        Here the sum is performed inplace with no caching.

        Parameters
        ----------
        alm1 : npt.NDArray[np.complex128]
            The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
        alm2 : npt.NDArray[np.complex128]
            The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
        alm3 : npt.NDArray[np.complex128]
            The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).
        sum_m1 : bool, optional
            Whether to sum over the m1's before output, by default False.

        Returns
        -------
        npt.NDArray[np.complex128] | np.complex128
            If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
            sum over all m-modes.
        """
        return gaunt_dot123(
            alm1=alm1,
            alm2=alm2,
            alm3=alm3,
            sum_m1=sum_m1,
            absm1_lower=self.absm1_limits[0],
            absm1_upper=self.absm1_limits[1],
        )

    def linear_gaunt_integrator_12(
        self,
        alm1: npt.NDArray[np.complex128],
        alm2: npt.NDArray[np.complex128],
        contract3: npt.NDArray[np.complex128] | None = None,
        sum_m1: bool = False,
    ) -> npt.NDArray[np.complex128]:
        r"""Generate a linear operator that, with fixed first and second set of coefficients
        performs the integral when sum-producted with the third set of coefficients.

        $$
        \matr{L}_{m_1}^{\ell_3 m_3} =
                    \sum_{\substack{\ell_1 \ell_2 \\ m_1 m_2}}
                    \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~
                    a_{\ell_1 m_1}a_{\ell_2 m_2}.
        $$

        If requested, additionally contract with a linear operator along the
        m3 axis (e.g. a rotation with a Wigner-d matrix),

        $$
        \matr{L}_{m_1}^{\ell_3 m^\prime_3} = \sum_{m_3} \matr{W}^{\ell_3 m^\prime_3}_{m_3}
                                             \matr{L}_{m_1}^{\ell_3 m_3}
        $$

        Parameters
        ----------
        alm1 : npt.NDArray[np.complex128]
            The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
        alm2 : npt.NDArray[np.complex128]
            The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
        contract3 : npt.NDArray[np.complex128] | None, optional
            An optional linear operator along the m3 axis, shape
            (l3max+1, 2*l3max+1, Nm3prime<=2*l3max+1), by default None
        sum_m1 : bool, optional
            Whether to sum over the m1's before output, by default False.

        Returns
        -------
        npt.NDArray[np.complex128]
            By default (Nm1, Nl3, nm3) linear operator. If sum_m1=True, an (Nl3, Nm3) array
            where the m1 axis has been summed over. If contract3 is present, this will instead
            be an (Nm1, Nl3, Nm3prime) or (Nl3, Nm3prime) array resulting from the subsequent
            contraction on the m3 axis.
        """
        tensor = gaunt_dot12(
            alm1=alm1,
            alm2=alm2,
            l3max=self.l3max,
            sum_m1=sum_m1,
            absm1_lower=self.absm1_limits[0],
            absm1_upper=self.absm1_limits[1],
        )
        if contract3 is not None:
            return integrator12_contract3(tensor, contract3)
        else:
            return tensor

    def generate_integrator_cache_12(
        self,
        alm1: npt.NDArray[np.complex128],
        alm2: npt.NDArray[np.complex128],
        contract3: npt.NDArray[np.complex128] | None = None,
        release_gaunt_cache: bool = True,
        batch_parallel_mode: Literal["channel-opt", "channel", "gaunt"] = "channel-opt",
    ):
        """_summary_

        Parameters
        ----------
        alm1 : npt.NDArray[np.complex128]
            _description_
        alm2 : npt.NDArray[np.complex128]
            _description_
        contract3 : npt.NDArray[np.complex128] | None, optional
            _description_, by default None
        release_gaunt_cache : bool, optional
            _description_, by default True
        batch_parallel_mode : Literal["channel-opt", "channel", "gaunt"], optional
            _description_, by default "channel-opt"
        """
        cache_type_tag: Literal["opt12", "generic"] = (
            "opt12" if batch_parallel_mode == "channel-opt" else "generic"
        )
        if self.gaunt_cache is None:
            self.generate_gaunt_cache(cache_type_tag)
        if not isinstance(self.gaunt_cache, self._cache_types[cache_type_tag]):
            raise TypeError(f"gaunt_cache must be a {self._cache_types[cache_type_tag]} object.")
        assert self.gaunt_cache is not None
        alm1 = extend_dimensions_if_one_batch(alm1, 3)
        alm2 = extend_dimensions_if_one_batch(alm2, 3)
        result = np.zeros(
            (
                alm1.shape[0],
                len(self.m1_values),
                self.l3max + 1,
                2 * self.l3max + 1,
            ),
            dtype=np.complex128,
        )
        if batch_parallel_mode in ["channel", "channel-opt"]:
            self.gaunt_cache.batch_parallel_batch_dot12(alm1, alm2, result)
        else:
            self.gaunt_cache.gaunt_parallel_batch_dot12(alm1, alm2, result)
        if release_gaunt_cache:
            self.clear_gaunt_cache()
        if contract3 is not None:
            result = integrator12_contract3(result, contract3)
        if result.shape[0] == 1:
            self.linear_integrator_cache_12 = result[0, ...]
        else:
            self.linear_integrator_cache_12 = result

    def batch_gaunt_integrate_cached_12(
        self,
        alm3: npt.NDArray[np.complex128],
        integrator_cache: npt.NDArray[np.complex128] | None = None,
        sum_m1: bool = False,
    ):
        if integrator_cache is None:
            if self.linear_integrator_cache_12 is None:
                raise ValueError("Integrator cache must be provided or pre-computed")
            else:
                tensor = self.linear_integrator_cache_12
        else:
            tensor = integrator_cache
        result = integrator12_dot3(tensor, alm3)
        if sum_m1:
            return np.sum(result, axis=0)
        else:
            return result

    def linear_gaunt_integrator_23(
        self,
        alm2: npt.NDArray[np.complex128],
        alm3: npt.NDArray[np.complex128],
        sum_m1: bool = False,
    ) -> npt.NDArray[np.complex128]:
        return gaunt_dot23(
            alm2=alm2,
            alm3=alm3,
            l1max=self.l1max,
            sum_m1=sum_m1,
            absm1_lower=self.absm1_limits[0],
            absm1_upper=self.absm1_limits[1],
        )

    def generate_integrator_cache_23(
        self,
        alm2: npt.NDArray[np.complex128],
        alm3: npt.NDArray[np.complex128],
        release_gaunt_cache: bool = True,
        batch_parallel_mode: Literal["channel", "gaunt"] = "channel",
    ):
        if self.gaunt_cache is None:
            self.generate_gaunt_cache()
        if not isinstance(self.gaunt_cache, CachingGauntContractor):
            raise TypeError("gaunt_cache must be a CachingGauntContractor object.")
        alm2 = extend_dimensions_if_one_batch(alm2, 3)
        alm3 = extend_dimensions_if_one_batch(alm3, 3)
        result = np.zeros(
            (alm2.shape[0], len(self.m1_values), self.l1max + 1), dtype=np.complex128
        )
        if batch_parallel_mode == "channel":
            self.gaunt_cache.batch_parallel_batch_dot23(alm2, alm3, result)
        else:
            self.gaunt_cache.gaunt_parallel_batch_dot23(alm2, alm3, result)
        if release_gaunt_cache:
            self.clear_gaunt_cache()
        if result.shape[0] == 1:
            self.linear_integrator_cache_23 = result[0, ...]
        else:
            self.linear_integrator_cache_23 = result

    def batch_gaunt_integrate_cached_23(
        self,
        alm1: npt.NDArray[np.complex128],
        integrator_cache: npt.NDArray[np.complex128] | None = None,
        sum_m1: bool = False,
    ):
        if integrator_cache is None:
            if self.linear_integrator_cache_23 is None:
                raise ValueError("Integrator cache must be provided or pre-computed")
            else:
                tensor = self.linear_integrator_cache_23
        else:
            tensor = integrator_cache
        result = integrator23_dot1(tensor, alm1)
        if sum_m1:
            return np.sum(result, axis=0)
        else:
            return result

clear_gaunt_cache()

Clears the Gaunt coefficient cache.

Source code in src/serval/core.py
def clear_gaunt_cache(self):
    """Clears the Gaunt coefficient cache."""
    self.gaunt_cache = None

gaunt_integrate(alm1, alm2, alm3, sum_m1=False)

Perform the triple integral by doing and inplace sum-product over gaunt coefficients.

\[ \mathcal{V} = \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_1 m_2 m_3}} \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~ a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3}. \]

Here the sum is performed inplace with no caching.

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).

required
alm2 NDArray[complex128]

The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).

required
alm3 NDArray[complex128]

The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).

required
sum_m1 bool

Whether to sum over the m1's before output, by default False.

False

Returns:

Type Description
NDArray[complex128] | complex128

If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar sum over all m-modes.

Source code in src/serval/core.py
def gaunt_integrate(
    self,
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    alm3: npt.NDArray[np.complex128],
    sum_m1: bool = False,
) -> npt.NDArray[np.complex128] | float:
    r"""Perform the triple integral by doing and inplace
    sum-product over gaunt coefficients.

    $$
    \mathcal{V} = \sum_{\substack{\ell_1 \ell_2 \ell_3 \\ m_1 m_2 m_3}}
                \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~
                a_{\ell_1 m_1}a_{\ell_2 m_2}a_{\ell_3 m_3}.
    $$

    Here the sum is performed inplace with no caching.

    Parameters
    ----------
    alm1 : npt.NDArray[np.complex128]
        The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
    alm2 : npt.NDArray[np.complex128]
        The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
    alm3 : npt.NDArray[np.complex128]
        The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).
    sum_m1 : bool, optional
        Whether to sum over the m1's before output, by default False.

    Returns
    -------
    npt.NDArray[np.complex128] | np.complex128
        If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
        sum over all m-modes.
    """
    return gaunt_dot123(
        alm1=alm1,
        alm2=alm2,
        alm3=alm3,
        sum_m1=sum_m1,
        absm1_lower=self.absm1_limits[0],
        absm1_upper=self.absm1_limits[1],
    )

generate_gaunt_cache(cache_type_tag='generic')

Generates the Gaunt coefficient cache.

Source code in src/serval/core.py
def generate_gaunt_cache(
    self, cache_type_tag: Literal["opt12", "generic"] = "generic"
):
    """Generates the Gaunt coefficient cache."""
    self.gaunt_cache = self._cache_types[cache_type_tag](
        self.l1max,
        self.l2max,
        self.l3max,
        absm1_lower=self.absm1_limits[0],
        absm1_upper=self.absm1_limits[1],
    )

generate_integrator_cache_12(alm1, alm2, contract3=None, release_gaunt_cache=True, batch_parallel_mode='channel-opt')

summary

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

description

required
alm2 NDArray[complex128]

description

required
contract3 NDArray[complex128] | None

description, by default None

None
release_gaunt_cache bool

description, by default True

True
batch_parallel_mode Literal['channel-opt', 'channel', 'gaunt']

description, by default "channel-opt"

'channel-opt'
Source code in src/serval/core.py
def generate_integrator_cache_12(
    self,
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    contract3: npt.NDArray[np.complex128] | None = None,
    release_gaunt_cache: bool = True,
    batch_parallel_mode: Literal["channel-opt", "channel", "gaunt"] = "channel-opt",
):
    """_summary_

    Parameters
    ----------
    alm1 : npt.NDArray[np.complex128]
        _description_
    alm2 : npt.NDArray[np.complex128]
        _description_
    contract3 : npt.NDArray[np.complex128] | None, optional
        _description_, by default None
    release_gaunt_cache : bool, optional
        _description_, by default True
    batch_parallel_mode : Literal["channel-opt", "channel", "gaunt"], optional
        _description_, by default "channel-opt"
    """
    cache_type_tag: Literal["opt12", "generic"] = (
        "opt12" if batch_parallel_mode == "channel-opt" else "generic"
    )
    if self.gaunt_cache is None:
        self.generate_gaunt_cache(cache_type_tag)
    if not isinstance(self.gaunt_cache, self._cache_types[cache_type_tag]):
        raise TypeError(f"gaunt_cache must be a {self._cache_types[cache_type_tag]} object.")
    assert self.gaunt_cache is not None
    alm1 = extend_dimensions_if_one_batch(alm1, 3)
    alm2 = extend_dimensions_if_one_batch(alm2, 3)
    result = np.zeros(
        (
            alm1.shape[0],
            len(self.m1_values),
            self.l3max + 1,
            2 * self.l3max + 1,
        ),
        dtype=np.complex128,
    )
    if batch_parallel_mode in ["channel", "channel-opt"]:
        self.gaunt_cache.batch_parallel_batch_dot12(alm1, alm2, result)
    else:
        self.gaunt_cache.gaunt_parallel_batch_dot12(alm1, alm2, result)
    if release_gaunt_cache:
        self.clear_gaunt_cache()
    if contract3 is not None:
        result = integrator12_contract3(result, contract3)
    if result.shape[0] == 1:
        self.linear_integrator_cache_12 = result[0, ...]
    else:
        self.linear_integrator_cache_12 = result

grid_integrate(alm1, alm2, alm3, sum_m1=False)

Perform the triple integral using a grid-based approach.

Here the product of the second and third fields are evaluated on a consistent resolution grid, decomposed to conjugate spherical harmonics and multipled by the first set of coefficients. This more standard m-mode approach is performant for single integrals but, as implemented here, can't cache intermediate results and therefore not suitable for batching.

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).

required
alm2 NDArray[complex128]

The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).

required
alm3 NDArray[complex128]

The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).

required
sum_m1 bool

Whether to sum over the m1's before output, by default False.

False

Returns:

Type Description
NDArray[complex128] | complex128

If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar sum over all m-modes.

Source code in src/serval/core.py
def grid_integrate(
    self,
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    alm3: npt.NDArray[np.complex128],
    sum_m1: bool = False,
) -> npt.NDArray[np.complex128]:
    r"""Perform the triple integral using a grid-based approach.

    Here the product of the second and third fields are evaluated on a
    consistent resolution grid, decomposed to conjugate spherical
    harmonics and multipled by the first set of coefficients. This more
    standard m-mode approach is performant for single integrals
    but, as implemented here, can't cache intermediate results and
    therefore not suitable for batching.


    Parameters
    ---------
    alm1 : npt.NDArray[np.complex128]
        The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
    alm2 : npt.NDArray[np.complex128]
        The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
    alm3 : npt.NDArray[np.complex128]
        The third set of spherical harmonic coefficients with shape (l3max+1, 2*l3max+1).
    sum_m1 : bool, optional
        Whether to sum over the m1's before output, by default False.

    Returns
    -------
    npt.NDArray[np.complex128] | np.complex128
        If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
        sum over all m-modes.
    """
    grid_lmax = int(np.ceil(self.l1max + self.l2max + self.l3max))
    alm1, alm2, alm3 = broadcast_bandlimits(alm1, alm2, alm3, lmax=grid_lmax)
    static_grid = grid_template(grid_lmax)
    static_grid.data[:] = 1.0
    static_grid *= array_synthesis(alm2) * array_synthesis(alm3)
    static_alm = analysis(static_grid, conjugate=True, return_sparse=False)
    out_mmodes = compute_mmodes(static_alm, alm1)
    return out_mmodes[np.asarray(self.m1_values) + grid_lmax]

linear_gaunt_integrator_12(alm1, alm2, contract3=None, sum_m1=False)

Generate a linear operator that, with fixed first and second set of coefficients performs the integral when sum-producted with the third set of coefficients.

\[ \matr{L}_{m_1}^{\ell_3 m_3} = \sum_{\substack{\ell_1 \ell_2 \\ m_1 m_2}} \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~ a_{\ell_1 m_1}a_{\ell_2 m_2}. \]

If requested, additionally contract with a linear operator along the m3 axis (e.g. a rotation with a Wigner-d matrix),

\[ \matr{L}_{m_1}^{\ell_3 m^\prime_3} = \sum_{m_3} \matr{W}^{\ell_3 m^\prime_3}_{m_3} \matr{L}_{m_1}^{\ell_3 m_3} \]

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).

required
alm2 NDArray[complex128]

The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).

required
contract3 NDArray[complex128] | None

An optional linear operator along the m3 axis, shape (l3max+1, 2l3max+1, Nm3prime<=2l3max+1), by default None

None
sum_m1 bool

Whether to sum over the m1's before output, by default False.

False

Returns:

Type Description
NDArray[complex128]

By default (Nm1, Nl3, nm3) linear operator. If sum_m1=True, an (Nl3, Nm3) array where the m1 axis has been summed over. If contract3 is present, this will instead be an (Nm1, Nl3, Nm3prime) or (Nl3, Nm3prime) array resulting from the subsequent contraction on the m3 axis.

Source code in src/serval/core.py
def linear_gaunt_integrator_12(
    self,
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    contract3: npt.NDArray[np.complex128] | None = None,
    sum_m1: bool = False,
) -> npt.NDArray[np.complex128]:
    r"""Generate a linear operator that, with fixed first and second set of coefficients
    performs the integral when sum-producted with the third set of coefficients.

    $$
    \matr{L}_{m_1}^{\ell_3 m_3} =
                \sum_{\substack{\ell_1 \ell_2 \\ m_1 m_2}}
                \mathcal{G}^{~~\ell_1~~\ell_2~~\ell_3}_{m_1 m_2 m_3}~
                a_{\ell_1 m_1}a_{\ell_2 m_2}.
    $$

    If requested, additionally contract with a linear operator along the
    m3 axis (e.g. a rotation with a Wigner-d matrix),

    $$
    \matr{L}_{m_1}^{\ell_3 m^\prime_3} = \sum_{m_3} \matr{W}^{\ell_3 m^\prime_3}_{m_3}
                                         \matr{L}_{m_1}^{\ell_3 m_3}
    $$

    Parameters
    ----------
    alm1 : npt.NDArray[np.complex128]
        The first set of spherical harmonic coefficients with shape (l1max+1, Nm1).
    alm2 : npt.NDArray[np.complex128]
        The second set of spherical harmonic coefficients with shape (l2max+1, 2*l2max+1).
    contract3 : npt.NDArray[np.complex128] | None, optional
        An optional linear operator along the m3 axis, shape
        (l3max+1, 2*l3max+1, Nm3prime<=2*l3max+1), by default None
    sum_m1 : bool, optional
        Whether to sum over the m1's before output, by default False.

    Returns
    -------
    npt.NDArray[np.complex128]
        By default (Nm1, Nl3, nm3) linear operator. If sum_m1=True, an (Nl3, Nm3) array
        where the m1 axis has been summed over. If contract3 is present, this will instead
        be an (Nm1, Nl3, Nm3prime) or (Nl3, Nm3prime) array resulting from the subsequent
        contraction on the m3 axis.
    """
    tensor = gaunt_dot12(
        alm1=alm1,
        alm2=alm2,
        l3max=self.l3max,
        sum_m1=sum_m1,
        absm1_lower=self.absm1_limits[0],
        absm1_upper=self.absm1_limits[1],
    )
    if contract3 is not None:
        return integrator12_contract3(tensor, contract3)
    else:
        return tensor

sparse_gaunt_einsum(sparse_alm1, sparse_alm2, sparse_alm3, sum_m1=False, gaunts=None)

Perform the triple integral using a sparse tensor contraction.

Compute the triple integral by generating or using a sparse representation of the Gaunt coefficients and performing the sum with a sparse einsum. This is mostly a cross-checking function and is not performant.

Parameters:

Name Type Description Default
sparse_alm1 SparseArray

Sparse represenation of the first set of coefficients with shape (l1max+1, Nm1).

required
sparse_alm2 SparseArray

Sparse represenation of the second set of coefficients with shape (l2max+1, 2*l2max+1).

required
sparse_alm3 SparseArray

Sparse represenation of the third set of coefficients (l3max+1, 2*l3max+1).

required
sum_m1 bool

Whether to sum over the m1's before output, by default False.

False
gaunts COO | None

Sparse representation of the Gaunt coefficients if already. computed, by default None.

None

Returns:

Type Description
NDArray[complex128] | complex128

If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar sum over all m-modes.

Source code in src/serval/core.py
def sparse_gaunt_einsum(
    self,
    sparse_alm1: sparse.SparseArray,
    sparse_alm2: sparse.SparseArray,
    sparse_alm3: sparse.SparseArray,
    sum_m1: bool = False,
    gaunts: sparse.COO | None = None,
) -> sparse.COO:
    r"""
    Perform the triple integral using a sparse tensor contraction.

    Compute the triple integral by generating or using a sparse
    representation of the Gaunt coefficients and performing the sum
    with a sparse einsum. This is mostly a cross-checking function
    and is not performant.

    Parameters
    ----------
    sparse_alm1 : sparse.SparseArray
        Sparse represenation of the first set of coefficients with shape (l1max+1, Nm1).
    sparse_alm2 : sparse.SparseArray
        Sparse represenation of the second set of coefficients with shape (l2max+1, 2*l2max+1).
    sparse_alm3 : sparse.SparseArray
        Sparse represenation of the third set of coefficients (l3max+1, 2*l3max+1).
    sum_m1 : bool, optional
        Whether to sum over the m1's before output, by default False.
    gaunts : sparse.COO | None, optional
        Sparse representation of the Gaunt coefficients if already.
        computed, by default None.

    Returns
    -------
    npt.NDArray[np.complex128] | np.complex128
        If sum_m1 = False, Nm1 array of the m-modes, otherwise, scalar
        sum over all m-modes.
    """
    if gaunts is None:
        gaunts = gaunts_coo(self.l1max, self.l2max, self.l3max)
    if sum_m1:
        return sparse.einsum(
            "ijklmn, ij, kl, mn -> ", gaunts, sparse_alm1, sparse_alm2, sparse_alm3
        )
    else:
        return sparse.einsum(
                "ijklmn, ij, kl, mn -> j",
                gaunts,
                sparse_alm1,
                sparse_alm2,
                sparse_alm3,
            ).todense()

VisProjector

Parameters:

Name Type Description Default
latitude float
required
longitude float
required
baseline_enu tuple[float, float, float]
required
frequencies_MHz ndarray
required
power_beam_lmax int | None
None
baseline_lmax int | None
None
sky_lmax int | None
None
sky_absm_limits tuple[int, Optional[int]]
(0, None)
generate_gaunt_cache_on_init bool
False
generate_baseline_cache_on_init bool
False
batch_parallel_mode Literal['channel-opt', 'channel', 'gaunt']
'channel'

Attributes:

Name Type Description
baseline_cache ndarray | None
integrator_cache ndarray | None
triple_sh_integrator TripleSHIntegrator
Source code in src/serval/core.py
@define(eq=False)
class VisProjector:
    latitude: float
    longitude: float
    baseline_enu: tuple[float, float, float]
    frequencies_MHz: np.ndarray

    power_beam_lmax: int | None = None
    baseline_lmax: int | None = None
    sky_lmax: int | None = None

    sky_absm_limits: tuple[int, int | None] = (0, None)

    generate_gaunt_cache_on_init: bool = False
    generate_baseline_cache_on_init: bool = False
    batch_parallel_mode: Literal["channel-opt", "channel", "gaunt"] = "channel"

    baseline_cache: np.ndarray | None = field(init=False, default=None)
    integrator_cache: np.ndarray | None = field(init=False, default=None)

    triple_sh_integrator: TripleSHIntegrator = field(init=False)

    _last_store_location: Path | None = field(init=False, default=None)
    _last_group_path: str | None = field(init=False, default=None)

    _to_attrs: ClassVar[list[str]] = []
    _to_zarr_store: ClassVar[list[str]] = []
    _opt_tag: ClassVar[str] = ""

    def update_integrator(self):
        self.triple_sh_integrator = TripleSHIntegrator(
            l1max=self.sky_lmax,
            l2max=self.baseline_lmax,
            l3max=self.power_beam_lmax,
            absm1_limits=self.sky_absm_limits,
            generate_cache_on_init=False,
        )

    def generate_gaunt_cache(self):
        if self.baseline_cache is None and self.baseline_lmax is None:
            self.generate_baseline_cache()
        self.update_integrator()
        if self.batch_parallel_mode == "channel-opt":
            self.triple_sh_integrator.generate_gaunt_cache(self._opt_tag)
        else:
            self.triple_sh_integrator.generate_gaunt_cache("generic")

    @property
    def sky_m_values(self):
        return self.triple_sh_integrator.m1_values

    @property
    def sky_m_index_map(self):
        return self.triple_sh_integrator.m1_index_map

    @property
    def sky_m_global_index(self):
        return self.triple_sh_integrator.m1_global_index

    @property
    def sky_m_index_slices(self):
        m_inds = self.sky_m_global_index
        nm_inds = m_inds[: len(m_inds) // 2]
        if len(nm_inds) != 0:  # If ms == [0,]
            pm_inds = m_inds[len(m_inds) // 2 :]
            return (
                [slice(0, self.num_sky_ms // 2), slice(self.num_sky_ms // 2, None)],
                [
                    slice(nm_inds[0], nm_inds[-1] + 1),
                    slice(pm_inds[0], pm_inds[-1] + 1),
                ],
            )
        else:
            return [
                slice(0, 1),
            ], [
                slice(m_inds[0], m_inds[0] + 1),
            ]

    @property
    def num_sky_ms(self):
        return len(self.sky_m_values)

    def _setup_zarr_store(
        self,
        store_location: str | Path,
        integrator_cache_shape: tuple[int, ...],
        integrator_cache_chunks: tuple[int, ...],
        group_path: str = r"/",
        store_metadata_attrs: dict | None = None,
    ):
        to_attrs_dict = {slot: getattr(self, slot) for slot in self._to_attrs}
        to_zarr_store_dict = {slot: getattr(self, slot) for slot in self._to_zarr_store}

        destination_dir = Path(store_location)

        self._last_store_location = destination_dir
        self._last_group_path = group_path

        store = zarr.storage.LocalStore(destination_dir)
        grp = zarr.create_group(store=store, path=group_path, overwrite=True)

        assert self.sky_lmax is not None

        if store_metadata_attrs is not None:
            if group_path != r"/":  # coulf check this more thoroughly
                root = zarr.group(store=store, path="/")
                root.attrs.update(store_metadata_attrs)
            else:
                grp.attrs.update(store_metadata_attrs)

        for k, v in to_attrs_dict.items():
            grp.attrs[k] = v

        for k, v in to_zarr_store_dict.items():
            arr = grp.create_array(
                name=k,
                shape=v.shape,
                dtype=v.dtype,
                overwrite=True,
            )
            arr[:] = v

        ms_completed = grp.create_array(
            name="ms_completed",
            chunks=1,
            shape=2 * self.sky_lmax + 1,
            dtype=bool,
            overwrite=True,
        )
        ms_completed[:] = False

        grp.create_array(
            name="integrator_cache",
            shape=integrator_cache_shape,
            chunks=integrator_cache_chunks,
            dtype=np.complex128,
            overwrite=True,
            compressors=zarr.codecs.BloscCodec(
                cname="zstd", clevel=3, shuffle=zarr.codecs.BloscShuffle.shuffle
            ),
        )

    def write_integrator_cache_to_store(
        self, store_location: str | Path | None = None, group_path: str | None = None
    ):
        if self.integrator_cache is None:
            raise ValueError("Cannot store integrator_cache because it is None.")
        if store_location is None and self._last_store_location is not None:
            destination_dir = self._last_store_location
        elif store_location is not None:
            destination_dir = Path(store_location)
        else:
            raise ValueError("Store location must be specified.")
        if group_path is None and self._last_group_path is not None:
            group_path = self._last_group_path
        elif group_path is None:
            group_path = r"/"  # Default to root group
        store = zarr.storage.LocalStore(destination_dir)
        grp = zarr.group(store=store, path=group_path)
        integrator_cache = grp["integrator_cache"]
        ms_completed = grp["ms_completed"]

        local_m_slices, global_m_slices = self.sky_m_index_slices
        for lslc, gslc in zip(local_m_slices, global_m_slices):  # noqa: B905
            integrator_cache[:, gslc] = self.integrator_cache[:, lslc]
            ms_completed[gslc] = True

    @classmethod
    def from_zarr_store(
        cls,
        store_location: str | Path,
        group_path: str = r"/",
        freq_slice: slice | None = None,
        sky_absm_limits=(0, None),
        load_cache=True,
    ):
        if freq_slice is None:
            freq_slice = slice(None, None, None)
        destination_dir = Path(store_location)
        store = zarr.storage.LocalStore(destination_dir)
        grp = zarr.group(store=store, path=group_path)
        init_dict = {k: grp.attrs[k] for k in cls._to_attrs}
        init_dict.update({k: grp[k][freq_slice] for k in cls._to_zarr_store})
        proj = cls(sky_absm_limits=sky_absm_limits, **init_dict)
        if load_cache:
            _, global_m_slices = proj.sky_m_index_slices
            cache_parts = []
            for gslc in global_m_slices:
                cache_parts.append(grp["integrator_cache"][freq_slice, gslc, ...])
                if not grp["ms_completed"][gslc].all():
                    warnings.warn(
                    "Zarr Store indicates not all requested sky ms computed.", stacklevel=2
                )  # points to caller
            proj.integrator_cache = np.concatenate(cache_parts, axis=1)
        return proj

extend_dimensions_if_one_batch(arr, dim)

If a single frequency is calculated then the alms and cache matrices will be missing the first dimension that corresponds to the number of frequencies. This function takes the arrays and adds a (1, ...) dimension

Source code in src/serval/core.py
def extend_dimensions_if_one_batch(
    arr: npt.NDArray[np.complex128], dim: int
) -> npt.NDArray[np.complex128]:
    """If a single frequency is calculated then the alms and cache matrices will be missing
    the first dimension that corresponds to the number of frequencies. This function takes the
    arrays and adds a (1, ...) dimension"""
    if arr.ndim < dim:
        arr = arr[None, ...]
    return arr

Rotate.py

Sht.py

Uitls.py

Gaunt Library

Core.py

gaunt_dot12(alm1, alm2, l3max, sum_m1=False, absm1_lower=None, absm1_upper=None)

Compute a projector for alm3 for the integral of the triple product of spherical harmonics by computing the Gaunt co-efficients in-place and sum-producting over l1, l2, m2 and m1 (if requested).

TODO add formula.

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

Spherical harmonic co-efficients in l1, m1.

required
alm2 NDArray[complex128]

Spherical harmonic co-efficients in l2, m2.

required
l3max int

Maximum l3 to compute co-efficients up to.

required
sum_m1 bool

If True, also sum over m1, otherwise return m1-modes for the m1 range specified, by default False

False
absm1_lower int | None

Lower limit in |m1| to use, by default None, ie. |m1| >= 0.

None
absm1_upper int | None

Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

None

Returns:

Type Description
NDArray[complex128]

m1-mode alm3 projector as a numpy array of shape (Nm1, l3max+1, 2l3max+1) if sum_m1 is False or (l3max+1, 2l3max+1) is sum_m1 is True.

Source code in src/serval/gaunt/core.py
def gaunt_dot12(
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    l3max: int,
    sum_m1: bool = False,
    absm1_lower: int | None = None,
    absm1_upper: int | None = None,
) -> npt.NDArray[np.complex128]:
    """Compute a projector for alm3 for the integral of the triple product
    of spherical harmonics by computing the Gaunt co-efficients in-place
    and sum-producting over l1, l2, m2 and m1 (if requested).

    TODO add formula.

    Parameters
    ----------
    alm1 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l1, m1.
    alm2 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l2, m2.
    l3max : int
        Maximum l3 to compute co-efficients up to.
    sum_m1 : bool, optional
        If True, also sum over m1, otherwise return m1-modes for the m1
        range specified, by default False
    absm1_lower : int | None, optional
        Lower limit in |m1| to use, by default None, ie. |m1| >= 0.
    absm1_upper : int | None, optional
        Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

    Returns
    -------
    npt.NDArray[np.complex128]
        m1-mode alm3 projector as a numpy array of shape (Nm1, l3max+1, 2*l3max+1)
        if sum_m1 is False or (l3max+1, 2*l3max+1) is sum_m1 is True.
    """
    if absm1_lower is None:
        absm1_lower = 0
    if absm1_upper is None:
        absm1_upper = alm1.shape[0]
    nm1 = len(_gaunt.m1_indexing(alm1.shape[0] - 1, absm1_lower, absm1_upper)[0])
    result = np.zeros((nm1, l3max + 1, 2 * l3max + 1), dtype=np.complex128)
    _gaunt.inplace_dot12(alm1, alm2, result, l3max, absm1_lower, absm1_upper)
    if sum_m1:
        return np.sum(result, axis=0)
    else:
        return result

gaunt_dot123(alm1, alm2, alm3, sum_m1=False, absm1_lower=None, absm1_upper=None)

Compute the integral of the triple product of spherical harmonics by computing the Gaunt co-efficients in-place and sum-producting over all harmonic degrees and orders except m1, unless requested.

TODO add formula.

Parameters:

Name Type Description Default
alm1 NDArray[complex128]

Spherical harmonic co-efficients in l1, m1.

required
alm2 NDArray[complex128]

Spherical harmonic co-efficients in l2, m2.

required
alm3 NDArray[complex128]

Spherical harmonic co-efficients in l3, m3.

required
sum_m1 bool

If True, also sum over m1, otherwise return m1-modes for the m1 range specified, by default False

False
absm1_lower int | None

Lower limit in |m1| to use, by default None, ie. |m1| >= 0.

None
absm1_upper int | None

Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

None

Returns:

Type Description
NDArray[complex128] | float

Numpy array, shape (Nm1), of m1-modes or their sum if sum_m1 is True.

Source code in src/serval/gaunt/core.py
def gaunt_dot123(
    alm1: npt.NDArray[np.complex128],
    alm2: npt.NDArray[np.complex128],
    alm3: npt.NDArray[np.complex128],
    sum_m1: bool = False,
    absm1_lower: int | None = None,
    absm1_upper: int | None = None,
) -> npt.NDArray[np.complex128] | float:
    """Compute the integral of the triple product of spherical harmonics
    by computing the Gaunt co-efficients in-place and sum-producting over all
    harmonic degrees and orders except m1, unless requested.

    TODO add formula.

    Parameters
    ----------
    alm1 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l1, m1.
    alm2 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l2, m2.
    alm3 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l3, m3.
    sum_m1 : bool, optional
        If True, also sum over m1, otherwise return m1-modes for the m1
        range specified, by default False
    absm1_lower : int | None, optional
        Lower limit in |m1| to use, by default None, ie. |m1| >= 0.
    absm1_upper : int | None, optional
        Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

    Returns
    -------
    npt.NDArray[np.complex128] | float
        Numpy array, shape (Nm1), of m1-modes or their sum if sum_m1 is True.
    """
    if absm1_lower is None:
        absm1_lower = 0
    if absm1_upper is None:
        absm1_upper = alm1.shape[0]
    nm1 = len(_gaunt.m1_indexing(alm1.shape[0] - 1, absm1_lower, absm1_upper)[0])
    result = np.zeros(nm1, dtype=np.complex128)
    _gaunt.inplace_dot123(alm1, alm2, alm3, result, absm1_lower, absm1_upper)
    if sum_m1:
        return result.sum()
    else:
        return result

gaunt_dot23(alm2, alm3, l1max, sum_m1=False, absm1_lower=None, absm1_upper=None)

Compute a projector for alm1 for the integral of the triple product of spherical harmonics by computing the Gaunt co-efficients in-place and sum-producting over l2, m2, l3, m3 and m1 (if requested).

TODO add formula.

Parameters:

Name Type Description Default
alm2 NDArray[complex128]

Spherical harmonic co-efficients in l2, m2.

required
alm3 NDArray[complex128]

Spherical harmonic co-efficients in l3, m3.

required
l1max int

Maximum l1 to compute co-efficients up to.

required
sum_m1 bool

If True, also sum over m1, otherwise return m1-modes for the m1 range specified, by default False

False
absm1_lower int | None

Lower limit in |m1| to use, by default None, ie. |m1| >= 0.

None
absm1_upper int | None

Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

None

Returns:

Type Description
NDArray[complex128]

m1-mode alm1 projector as a numpy array of shape (Nm1, l1max+1) if sum_m1 is False or (l1max+1,) is sum_m1 is True.

Source code in src/serval/gaunt/core.py
def gaunt_dot23(
    alm2: npt.NDArray[np.complex128],
    alm3: npt.NDArray[np.complex128],
    l1max: int,
    sum_m1: bool = False,
    absm1_lower: int | None = None,
    absm1_upper: int | None = None,
) -> npt.NDArray[np.complex128]:
    """Compute a projector for alm1 for the integral of the triple product
    of spherical harmonics by computing the Gaunt co-efficients in-place
    and sum-producting over l2, m2, l3, m3 and m1 (if requested).

    TODO add formula.

    Parameters
    ----------
    alm2 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l2, m2.
    alm3 : npt.NDArray[np.complex128]
        Spherical harmonic co-efficients in l3, m3.
    l1max : int
        Maximum l1 to compute co-efficients up to.
    sum_m1 : bool, optional
        If True, also sum over m1, otherwise return m1-modes for the m1
        range specified, by default False
    absm1_lower : int | None, optional
        Lower limit in |m1| to use, by default None, ie. |m1| >= 0.
    absm1_upper : int | None, optional
        Upper limit in |m1| to use, by default None, ie. |m1| <= m1max = l1max.

    Returns
    -------
    npt.NDArray[np.complex128]
        m1-mode alm1 projector as a numpy array of shape (Nm1, l1max+1)
        if sum_m1 is False or (l1max+1,) is sum_m1 is True.
    """
    if absm1_lower is None:
        absm1_lower = 0
    if absm1_upper is None:
        absm1_upper = l1max + 1
    nm1 = len(_gaunt.m1_indexing(l1max, absm1_lower, absm1_upper)[0])
    result = np.zeros((nm1, l1max + 1), dtype=np.complex128)
    _gaunt.inplace_dot23(alm2, alm3, result, l1max, absm1_lower, absm1_upper)
    if sum_m1:
        return np.sum(result, axis=0)
    else:
        return result

gaunts_coo(l1max, l2max, l3max)

Compute a sparse representation of all Gaunt co-efficients up to maximum harmonic degrees l1max, l2max and l3max.

Parameters:

Name Type Description Default
l1max int

Maximum harmonic degree l1.

required
l2max int

Maximum harmonic degree l2.

required
l3max int

Maximum harmonic degree l3.

required

Returns:

Type Description
COO

COO formatted sparse array of the Gaunt co-efficients.

Source code in src/serval/gaunt/core.py
def gaunts_coo(l1max: int, l2max: int, l3max: int) -> sparse.COO:
    """Compute a sparse representation of all Gaunt co-efficients up
    to maximum harmonic degrees l1max, l2max and l3max.

    Parameters
    ----------
    l1max : int
        Maximum harmonic degree l1.
    l2max : int
        Maximum harmonic degree l2.
    l3max : int
        Maximum harmonic degree l3.

    Returns
    -------
    sparse.COO
        COO formatted sparse array of the Gaunt co-efficients.
    """
    coords, data, shape = _gaunt.gaunts_coo(l1max, l2max, l3max)
    coords = np.array(coords)
    coords = coords.view(dtype=coords.dtype[0]).reshape(-1, 6).T
    data = np.array(data)
    return sparse.COO(
        coords=coords,
        data=data,
        shape=shape,
        has_duplicates=False,
        sorted=True,
        fill_value=0.0,
    )

gaunts_l1m1_coo(l1, absm1, l1max, l2max, l3max)

Compute a sparse representation of all Gaunt co-efficients with given harmonic degree l1 and harmonic order |m1|, up to maximum harmonic degrees l1max, l2max and l3max. The shape of the coordinates is the same as for the full gaunt coeffcients (gaunts_coo). The only difference is that the first two indices are l1 and m1, set to the input values.

Parameters:

Name Type Description Default
l1 int

Harmonic degree l1.

required
absm1 int

Harmonic order |m1|.

required
l1max int

Maximum harmonic degree l1.

required
l2max int

Maximum harmonic degree l2.

required
l3max int

Maximum harmonic degree l3.

required

Returns:

Type Description
COO

COO formatted sparse array of the Gaunt co-efficients.

Source code in src/serval/gaunt/core.py
def gaunts_l1m1_coo(
    l1: int, absm1: int, l1max: int, l2max: int, l3max: int
) -> sparse.COO:
    """Compute a sparse representation of all Gaunt co-efficients with given harmonic degree
    l1 and harmonic order |m1|, up to maximum harmonic degrees l1max, l2max and l3max. The shape
    of the coordinates is the same as for the full gaunt coeffcients (gaunts_coo).
    The only difference is that the first two indices are l1 and m1, set to the input values.

    Parameters
    ----------
    l1 : int
        Harmonic degree l1.
    absm1 : int
        Harmonic order |m1|.
    l1max : int
        Maximum harmonic degree l1.
    l2max : int
        Maximum harmonic degree l2.
    l3max : int
        Maximum harmonic degree l3.

    Returns
    -------
    sparse.COO
        COO formatted sparse array of the Gaunt co-efficients.
    """
    # Why is l1max needed...?
    coords, data, shape = _gaunt.gaunts_l1m1_coo(l1, absm1, l1max, l2max, l3max)
    coords = np.array(coords)
    coords = coords.view(dtype=coords.dtype[0]).reshape(-1, 6).T
    data = np.array(data)
    return sparse.COO(
        coords=coords,
        data=data,
        shape=shape,
        has_duplicates=False,
        sorted=True,
        fill_value=0.0,
    )

integrator12_contract3(int12, contract3)

(m1 l3 m3) (l3 m3' m3) -> (m1 l3 m3')

Source code in src/serval/gaunt/core.py
def integrator12_contract3(
    int12: npt.NDArray[np.complex128], contract3: npt.NDArray[np.complex128]
) -> npt.NDArray[np.complex128]:
    """(m1 l3 m3) (l3 m3' m3) -> (m1 l3 m3')"""
    # Check m3' axis smaller than m3 axis
    if int12.ndim == 3:  # Has no frequency axis
        result = np.zeros(
            (int12.shape[0], contract3.shape[0], contract3.shape[1]),
            dtype=np.complex128,
        )
        _contract.integrator12_contract3(int12, contract3, result)
        return result
    elif int12.ndim == 4:  # Has frequency axis
        result = np.zeros(
            (int12.shape[0] * int12.shape[1], contract3.shape[0], contract3.shape[1]),
            dtype=np.complex128,
        )
        _contract.integrator12_contract3(
            int12.reshape((-1,) + int12.shape[2:]), contract3, result
        )
        return result.reshape(
            (int12.shape[0], int12.shape[1], contract3.shape[0], contract3.shape[1])
        )
    else:
        raise ValueError

single_gaunt(l1, l2, l3, m1, m2)

Computes a single Gaunt co-efficient for given harmonic degrees and orders. This uses Wigner-3j family algorithms so it is not efficient for computing many Gaunts co-efficients. Primarily for testing purposes m3 is determined by m1 + m2 + m3 = 0.

Parameters:

Name Type Description Default
l1 int

Harmonic degree l1.

required
l2 int

Harmonic degree l2.

required
l3 int

Harmonic degree l3.

required
m1 int

Harmonic order m1.

required
m2 int

Harmonic order m2.

required

Returns:

Type Description
float

Computed Gaunt co-efficient.

Source code in src/serval/gaunt/core.py
def single_gaunt(l1: int, l2: int, l3: int, m1: int, m2: int) -> float:
    """Computes a single Gaunt co-efficient for given harmonic degrees
    and orders. This uses Wigner-3j family algorithms so it is not
    efficient for computing many Gaunts co-efficients. Primarily for
    testing purposes m3 is determined by m1 + m2 + m3 = 0.

    Parameters
    ----------
    l1 : int
        Harmonic degree l1.
    l2 : int
        Harmonic degree l2.
    l3 : int
        Harmonic degree l3.
    m1 : int
        Harmonic order m1.
    m2 : int
        Harmonic order m2.

    Returns
    -------
    float
        Computed Gaunt co-efficient.
    """
    triangle = abs(l1 - l3) <= l2 <= abs(l1 + l3)
    parity = (l1 + l2 + l3) % 2 != 0
    if (
        not triangle
        or l1 < 0
        or l2 < 0
        or l3 < 0
        or abs(m1) > l1
        or abs(m2) > l2
        or abs(m1 + m2) > l3
        or parity      # parity condition: the sum of the harmonic degrees must be even
    ):
        return 0.0
    return _gaunt.single_gaunt(l1, l2, l3, m1, m2)

wigner_3jj(l2, l3, m2, m3)

Compute the family of non-zero Wigner-3j terms for harmonic degrees l2 and l3 for harmonic degrees m1, m2, m3 = -m1 -m2.

Parameters:

Name Type Description Default
l2 int

Harmonic degree l2.

required
l3 int

Harmonic degree l3.

required
m2 int

Harmonic order m2.

required
m3 int

Harmonic order m3.

required

Returns:

Type Description
tuple[int, NDArray[float64]]

Tuple of first non-zero harmonic degree l1min and array of Wigner-3j values of length l1max - l1min. If there are no non-zero elements of the Wigner-3j family, returns (-1, np.array([])).

Source code in src/serval/gaunt/core.py
def wigner_3jj(
    l2: int, l3: int, m2: int, m3: int
) -> tuple[int, npt.NDArray[np.float64]]:
    """Compute the family of non-zero Wigner-3j terms for harmonic degrees l2 and l3
    for harmonic degrees m1, m2, m3 = -m1 -m2.

    Parameters
    ----------
    l2 : int
        Harmonic degree l2.
    l3 : int
        Harmonic degree l3.
    m2 : int
        Harmonic order m2.
    m3 : int
        Harmonic order m3.

    Returns
    -------
    tuple[int, npt.NDArray[np.float64]]
        Tuple of first non-zero harmonic degree l1min and array of Wigner-3j values of
        length l1max - l1min. If there are no non-zero elements of the Wigner-3j family,
        returns (-1, np.array([])).
    """
    # In case there is an error in the result, pyshtools Wigner3j can also be used.
    # However, it carries extra zeros that need to be removed:
    # Wigner_3j = pysh.utils.Wigner3j(l2, l3, -m2 - m3, m2, m3)[0]
    # lmin = max(abs(l2 - l3), abs(-m2 -m3))
    # lmax = l2 + l3
    # lnum = lmax - lmin + 1
    # return lmin, Wigner_3j[: lnum + 1 :]
    if m2 == 0 and m3 == 0:
        return wigner_3jj_000(l2, l3)
    if l2 < 0 or l3 < 0 or abs(m2) > l2 or abs(m3) > l3:
        return (-1, np.array([]))
    result = _wigner.wigner_3jj_nochecks(l2, l3, m2, m3)
    return result[0], np.array(result[1])

wigner_3jj_000(l2, l3)

Compute the family of non-zero Wigner-3j terms for harmonic degrees l2 and l3 for harmonic degrees m1 = m2 = m3 = 0.

Parameters:

Name Type Description Default
l2 int

Harmonic degree l2.

required
l3 int

Harmonic degree l3.

required

Returns:

Type Description
tuple[int, NDArray[float64]]

Tuple of first non-zero harmonic order l1min and array of Wigner-3j values of length l1max - l1min. If there are no non-zero elements of the Wigner-3j family, returns (-1, np.array([])).

Source code in src/serval/gaunt/core.py
def wigner_3jj_000(l2: int, l3: int) -> tuple[int, npt.NDArray[np.float64]]:
    """Compute the family of non-zero Wigner-3j terms for harmonic degrees l2 and l3
    for harmonic degrees m1 = m2 = m3 = 0.

    Parameters
    ----------
    l2 : int
        Harmonic degree l2.
    l3 : int
        Harmonic degree l3.

    Returns
    -------
    tuple[int, npt.NDArray[np.float64]]
        Tuple of first non-zero harmonic order l1min and array of Wigner-3j values of
        length l1max - l1min. If there are no non-zero elements of the Wigner-3j family,
        returns (-1, np.array([])).
    """
    if l2 < 0 or l3 < 0:
        return (-1, np.array([]))
    result = _wigner.wigner_3jj_000_nochecks(l2, l3)
    return result[0], np.array(result[1])

wigner_3jm(l1, l2, l3, m1)

Compute the family of Wigner-3j terms for harmonic degrees l1, l2 and l3 all non-zero terms with harmonic order m1.

Parameters:

Name Type Description Default
l1 int

Harmonic degree l1.

required
l2 int

Harmonic degree l2.

required
l3 int

Harmonic degree l3.

required
m1 int

Harmonic order m1.

required

Returns:

Type Description
tuple[int, NDArray[float64]]

Tuple of first non-zero harmonic order m2min and array of Wigner-3j values of length l2max - l2min. If there are no non-zero elements of the Wigner-3j family, returns (-1, np.array([])).

Source code in src/serval/gaunt/core.py
def wigner_3jm(
    l1: int, l2: int, l3: int, m1: int
) -> tuple[int, npt.NDArray[np.float64]]:
    """Compute the family of Wigner-3j terms for harmonic degrees l1, l2 and l3
    all non-zero terms with harmonic order m1.

    Parameters
    ----------
    l1 : int
        Harmonic degree l1.
    l2 : int
        Harmonic degree l2.
    l3 : int
        Harmonic degree l3.
    m1 : int
        Harmonic order m1.

    Returns
    -------
    tuple[int, npt.NDArray[np.float64]]
        Tuple of first non-zero harmonic order m2min and array of Wigner-3j values of
        length l2max - l2min. If there are no non-zero elements of the Wigner-3j family,
        returns (-1, np.array([])).
    """
    triangle = abs(l1 - l3) <= l2 <= abs(l1 + l3)
    if (
        not triangle
        or l1 < 0
        or l2 < 0
        or l3 < 0
        or abs(m1) > l1
        or not (abs(l1 - l3) < l2 < l1 + l3)
    ):
        return (-1, np.array([]))
    result = _wigner.wigner_3jm_nochecks(l1, l2, l3, m1)
    return result[0], np.array(result[1])

Gaunt.cpp

These are the c++ functions:

Wigner.cpp

These are the c++ functions:

\[ E = mc^2 \]