# This file is part of PyCosmo, a multipurpose cosmology calculation tool in Python.
#
# Copyright (C) 2013-2021 ETH Zurich, Institute for Particle and Astrophysics and SIS
# ID.
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this
# program. If not, see <http://www.gnu.org/licenses/>.
import functools
import hashlib
import os
import sys
import textwrap
import types
import warnings
import dill
import numpy as np
from PyCosmo import core_file_handling
from PyCosmo.Background import Background, _call
from PyCosmo.BoltzmannSolver.LinearPerturbationBoltzmann import (
LinearPerturbationBoltzmann,
)
from PyCosmo.LinearPerturbationApprox import LinearPerturbationApprox
from sympy2c import __version__ as sympy2c_version
from sympy2c.compiler import base_cache_folder, load_module
from sympy2c.utils import get_platform
from ._Util import get_active_branch_name
from .config import INFO, check_flattened, load_parameters_ini
from .ini_handling import Bunch
from .LinearPerturbationTable import LinearPerturbationTable
from .NonLinearPerturbationHaloFit import NonLinearPerturbationHaloFit
from .NonLinearPerturbationMead import NonLinearPerturbationMead
from .NonLinearPerturbationTable import NonLinearPerturbationTable
from .Projection import Projection
HERE = os.path.dirname(os.path.abspath(__file__))
def make_pickable(checks):
detached_checks = [
types.FunctionType(check.__code__, __builtins__) for check in checks
]
return detached_checks
[docs]class Cosmo(object):
"""
All of main functionalities of PyCosmo are managed by the main (Cosmo) class,
which links to the other classes where the bulk of the calculations is
performed.
"""
_DEFAULT_PARAM_FILE = os.path.join(HERE, "config", "default.ini")
def __init__(self, *, model_config=None):
"""Initiates the cosmo class
:param paramfile:
"""
if model_config is None:
raise ValueError(
"please use PyCosmo.build to create an instance of {}".format(
self.__class__
)
)
self.model_config = model_config
core_equations_files = model_config.main.core_equations_files
key_0 = "_".join(map(str, sympy2c_version))
key_1 = ""
for i, core_equations_file in enumerate(core_equations_files):
if not core_equations_file.startswith(
"."
) and not core_equations_file.startswith("/"):
core_equations_file = os.path.join(HERE, core_equations_file)
core_equations_files[i] = core_equations_file
key_1 += hashlib.md5(
open(core_equations_file, "rb").read() + str(self.model_config).encode()
).hexdigest()[:6]
self.core_equations_files = core_equations_files
print(self.model_config)
self.params = Bunch({})
self._cache_file_name = "{}__{}.pkl".format(key_0, key_1)
self._load_wrapper()
self.load_params()
def _load_wrapper(self):
from . import version
branch = get_active_branch_name() or ""
platform = get_platform()
folder = os.path.join(
base_cache_folder(), "PyCosmo", platform, branch, version.replace(".", "_")
)
if not os.path.exists(folder):
try:
os.makedirs(folder)
except IOError:
# might be triggered due to race condition when run in parallel
assert os.path.exists(folder), "this must not happen"
cache_file = os.path.join(folder, self._cache_file_name)
wrapper_loaded = False
if os.path.exists(cache_file):
print("cache hit at", cache_file)
data = dill.load(open(cache_file, "rb"))
folder = data["folder"]
checks = data["checks"]
enrich_params = data["enrich_params"]
try:
print("load wrapper from", folder)
wrapper = load_module(folder)
wrapper_loaded = True
except ImportError:
wrapper = None
if not wrapper_loaded:
module, checks, enrich_params = core_file_handling.load_core_files(
self.core_equations_files, self.model_config
)
compilation_flags = self.model_config.main.compilation_flags.split(" ")
wrapper = module.compile_and_load(compilation_flags=compilation_flags)
data = {
"folder": wrapper._folder,
"module": module,
"model_config": self.model_config,
"checks": make_pickable(checks),
"enrich_params": enrich_params and make_pickable([enrich_params])[0],
}
with open(cache_file, "wb") as fh:
dill.dump(data, fh)
print("new cache entry at", cache_file)
self.model_parameter_checks = checks
self.model_enrich_params = enrich_params
self._wrapper = wrapper
self._cache_file = cache_file
@staticmethod
def recompile_from_cli():
assert len(sys.argv) == 2, "need name to pkl file"
cache_file = sys.argv[1]
try:
data = dill.load(open(cache_file, "rb"))
except Exception as e:
print("could not read {}: {}".format(cache_file, e))
sys.exit(1)
model_config = data["model_config"]
compilation_flags = model_config.main.compilation_flags.split(" ")
module = data["module"]
wrapper = module.recompile_and_load(
compilation_flags=compilation_flags, force=True
)
data = {
"folder": wrapper._folder,
"module": module,
"model_config": model_config,
"checks": data["checks"],
"enrich_params": data["enrich_params"],
}
with open(cache_file, "wb") as fh:
dill.dump(data, fh)
print("new cache entry at", cache_file)
[docs] def load_params(self, paramfile=None):
r"""
Loads parameter file and additional user-defined parameters
to initialize the Cosmo object
:param paramfile: path to parameter file
"""
if paramfile is None:
paramfile = self.model_config.main.default_ini_file
user_parameters, model_specific_parameters = load_parameters_ini(paramfile)
check_flattened(user_parameters)
self.user_parameters = user_parameters
self.model_specific_parameters = model_specific_parameters
self.paramfile = paramfile
self._reset()
[docs] def set(self, **kwargs):
"""
Changes the configuration parameters of the PyCosmo instance. This will also
recalculate the derived quantities.
:param kwargs: keyword arguments for the input parameters to change.
.. Warning ::
Always change parameters of an instance of PyCosmo using the set function.
Do not change variables directly!
:return: If no keywords are passed then a list of options is printed
Example:
.. code-block:: python
cosmo.set(omega_m=0.23)
"""
model_specific_params = {
k: v for (k, v) in kwargs.items() if k in self.model_specific_parameters
}
self.model_specific_parameters.update(model_specific_params)
general_user_params = {
k: v for (k, v) in kwargs.items() if k not in self.model_specific_parameters
}
check_flattened(general_user_params)
self.user_parameters.update(general_user_params)
self._reset()
if len(general_user_params) == 0:
print("Current status:")
self.print_params(inc_derived=False)
print("")
else:
print("Parameters updated")
def __getstate__(self):
dd = self.__dict__.copy()
dd["model_parameter_checks"] = dill.dumps(dd["model_parameter_checks"])
dd["model_enrich_params"] = dill.dumps(dd["model_enrich_params"])
return dd
def __setstate__(self, dd):
dd["model_parameter_checks"] = dill.loads(dd["model_parameter_checks"])
dd["model_enrich_params"] = dill.loads(dd["model_enrich_params"])
self.__dict__.update(dd)
self._reset()
def _reset(self):
"""
Resets the internal data in the instance
:param paramfile: (optional) the name of the param file to use.
"""
# check_flattened(self.user_parameters)
self.derived_parameters = derived_parameters = self._enrich_params_pre(
self.user_parameters
)
merged = self.user_parameters.copy()
merged.update(derived_parameters)
merged.update(self.model_specific_parameters)
self.params.clear()
self.params.update(merged)
params = self.params
messages = []
for check in self.model_parameter_checks:
msg = check(params)
if msg:
messages.append(msg)
if messages:
raise ValueError(
"the model did not accept the given parameters: {}".format(
", ".join(messages)
)
)
self.reset_wrapper_globals()
self.derived_parameters = derived_parameters = self._enrich_params_post(
self.params
)
merged = self.params.copy()
merged.update(derived_parameters)
self.params.clear()
self.params.update(merged)
self.reset_wrapper_globals()
params = self.params
recomb = params.recomb
if recomb == "recfast++":
print("Recombination set to Recfast++")
from PyCosmo._Recombination import Recombination
self.rec = Recombination(params)
elif recomb == "class":
print("Recombination set to CLASS")
from PyCosmo._RecombinationClass import RecombinationClass
self.rec = RecombinationClass(params)
elif recomb == "cosmics":
print("Recombination set to COSMICS")
from PyCosmo._RecombinationCosmics import RecombinationCosmics
self.rec = RecombinationCosmics(params, self)
elif recomb is None:
print("Ignoring recombination.")
self.rec = None
else:
raise ValueError("invalid value '{}' for recombination".format(recomb))
self.background = background = Background(self, params, self.rec, self._wrapper)
if params.pk_type == "boltz":
a_table = 10 ** np.linspace(-15, 0, params.table_size)
wrapper = self._wrapper
wrapper.set_c_s_interp_values(a_table, background.cs(a_table))
wrapper.set_taudot_interp_values(a_table, background.taudot(a_table))
wrapper.set_eta_interp_values(a_table, background.eta(a_table))
if params.pk_type in ("EH", "BBKS", "BBKS_CCL"):
if params.pk_norm == "A_s":
raise ValueError(
"A_s normalization not available for fitting functions"
)
self.lin_pert = LinearPerturbationApprox(self)
elif params.pk_type == "boltz":
self.lin_pert = LinearPerturbationBoltzmann(self)
if params.pk_nonlin_type is not None:
params.pk_nonlin_type = None
warnings.warn(
"disabled non-linear perturbations which are not working "
"with the Boltzmann solver yet",
)
else:
# Todo raise an error
raise ValueError("unknown pk_type value {} used".format(params.pk_type))
self.lin_pert = None
# TODO: remove duplicate ?
self.lin_pert_tab = LinearPerturbationTable(self.lin_pert)
if params.pk_nonlin_type == "halofit" or params.pk_nonlin_type == "rev_halofit":
assert (
params.pk_type != "boltz"
), "Halofit model not supported by the Boltzmann solver yet!"
self.nonlin_pert = NonLinearPerturbationHaloFit(self)
elif params.pk_nonlin_type == "mead":
assert (
params.pk_type != "boltz"
), "Mead model not supported by the Boltzmann solver yet!"
self.nonlin_pert = NonLinearPerturbationMead(self)
elif params.pk_nonlin_type is None:
self.nonlin_pert = None
else:
raise ValueError(
'unknown pk_nonlin_type "{}"'.format(params.pk_nonlin_type)
)
self.lin_pert_tab = LinearPerturbationTable(self.lin_pert)
if self.nonlin_pert is None:
self.projection = FakeProjection()
return
self.nonlin_pert_tab = NonLinearPerturbationTable(self.nonlin_pert, params)
if params.tabulation:
self.projection = Projection(
params, self.background, self.lin_pert_tab, self.nonlin_pert_tab
)
else:
self.projection = Projection(
params, self.background, self.lin_pert, self.nonlin_pert
)
def reset_wrapper_globals(self):
params = self.params
g = {}
for name in self._wrapper.get_globals().keys():
if hasattr(params, name):
g[name] = getattr(params, name)
self._wrapper.set_globals(**g)
def _enrich_params_pre(self, params):
"""
Setting basic constants and some derived quantities
Initialising the basic cosmological parameters
"""
derived_parameters = Bunch()
if "cosmo_nudge" not in params.keys():
nudge = [1.0, 1.0, 1.0] # no nudge
else:
nudge = list(params.cosmo_nudge)
if nudge != [1.0, 1.0, 1.0]:
print(
"Warning: nudges to H0, omega_gamma, omega_neu introduced "
"- for debugging purposes only"
)
derived_parameters.H0 = (
100.0 * params.h * nudge[0]
) # Hubble constant [km/s/Mpc]
derived_parameters.rh = (
params.c / derived_parameters.H0 * params.h
) # Hubble radius (=c/H0) at z=0 [h^-1 Mpc]
# critical density at z=0 [h^2 M_sun Mpc^-3]
derived_parameters.rho_crit = (
3.0
* derived_parameters.H0 ** 2
/ (8.0 * np.pi * params.G)
* 1e6
* params.mpc
/ params.msun
/ params.h ** 2
)
# TODO: check omega_gamma and omega_neu expressions
# omega_photon (see Dodelson eq. 2.70) ** express in terms of H0? **)
# params.omega_gamma = 2.470109245e-5 * (params.Tcmb / 2.725)**4 / params.h**2 *
# nudge[1]
# Uwe: put this into core file(s)? depends om model_specific_parameters
if params.N_massive_nu != 0:
derived_parameters.N_eff = (
params.N_massless_nu
+ self.model_specific_parameters.T_mnu ** 4
* (4 / 11) ** -(4 / 3)
* params.N_massive_nu
)
derived_parameters.rho_gamma_eV = (
(3 * 100 ** 2 / (8 * np.pi * params.G))
* params.hbar ** 3
* (1 / (params.mpc * 1e-3)) ** 2
* (1 / (params.evc2))
* (params.c * 1e3) ** 3
)
derived_parameters.T_0 = params.Tcmb * ((4 / 11) ** (1 / 3))
derived_parameters.rho_mnu_eV = (
3.0
* derived_parameters.H0 ** 2
* (params.hbar) ** 3
* (params.c * 1e3) ** 3
/ (8 * np.pi * params.G)
* (1 / (params.evc2))
* (1 / (params.mpc * 1e-3)) ** 2
/ (params.kb * derived_parameters.T_0) ** 4
)
derived_parameters.omega_gamma_prefactor = (
np.pi ** 2
/ 15
* (2.725 * params.kb) ** 4
/ (derived_parameters.rho_gamma_eV)
)
derived_parameters.omega_gamma = (
derived_parameters.omega_gamma_prefactor
* (params.Tcmb / 2.725) ** 4
/ params.h ** 2
* nudge[1]
)
derived_parameters.omega_neu = (
params.N_massless_nu
* 7.0
/ 8.0
* (4.0 / 11.0) ** (4.0 / 3.0)
* derived_parameters.omega_gamma
* nudge[2]
) # omega for massless neutrino
derived_parameters.omega_r = (
derived_parameters.omega_gamma + derived_parameters.omega_neu
) # omega_radiation
# suppress_rad
if params.suppress_rad:
derived_parameters.omega_r = 0.0
if params.omega_suppress:
# this ignores curvature - wrong but used by some codes which does not
# account for omega_r in omega
derived_parameters.omega = 1.0
derived_parameters.omega_k = 0.0
derived_parameters.omega_r = 0.0
print(
"Cosmo_add_constants: Warning: curvature suppressed - "
"for testing purposes only"
)
# TODO: curvature suppression perhaps needs to be removed at some point
if params.pk_type == "boltz":
raise ValueError(
"Behaviour of the Boltzmann solver undetermined for suppressed"
" radiation!"
)
if self.model_enrich_params is not None:
self.model_enrich_params(params, derived_parameters)
return derived_parameters
def _enrich_params_post(self, params):
"""
Setting basic constants and some derived quantities
Initialising the basic cosmological parameters
"""
derived_parameters = Bunch()
derived_parameters.omega_nu_m = getattr(self._wrapper, "omega_nu_m")(1.0)
derived_parameters.omega_k = _call(self._wrapper, "omega_k")
if params.flat_universe:
derived_parameters.omega_l = _call(self._wrapper, "omega_l_a", 1.0)
else:
if params.omega_l is None:
raise ValueError(
"for a non-float universe you also must provide a value for omega_l"
)
derived_parameters.omega_l = params.omega_l
if params.pk_type == "boltz":
raise NotImplementedError(
"Boltzmann solver only implemented for flat cosmologies"
)
derived_parameters.omega = (
params.omega_m
+ params.omega_r
+ params.omega_l
+ derived_parameters.omega_nu_m
+ derived_parameters.omega_k
) # correct expression
derived_parameters.omega_dm = (
params.omega_m - params.omega_b
) # DM density (z=0)
if derived_parameters.omega_k == 0.0:
derived_parameters.sqrtk = 1.0
else:
derived_parameters.sqrtk = np.sqrt(abs(derived_parameters.omega_k))
if params.omega_r > 0.0:
derived_parameters.a_eq = (
params.omega_r / params.omega_m
) # matter-radiation equality
derived_parameters.z_eq = 1.0 / derived_parameters.a_eq - 1.0
else:
derived_parameters.a_eq = np.nan
derived_parameters.z_eq = np.nan
# derived_parameters.H_0 = 100 * params.h
omh2 = params.omega_m * params.h ** 2
derived_parameters._sigma_27 = params.Tcmb / 2.7 # Normalised CMB temperature
# Wave vector values
derived_parameters._k_eq = (
7.46e-2 * omh2 * derived_parameters._sigma_27 ** -2
) # Eq. (3), units: Mpc^-1
# Redshift values
# TODO: this differs from z_eq in the cosmo class. should probably rename
# it to avoid confusion
# Redshift at matter-radiation equality, Eq. (2)
derived_parameters._z_equality = (
2.5e4 * omh2 * derived_parameters._sigma_27 ** -4
)
b_1 = 0.313 * omh2 ** -0.419 * (1.0 + 0.607 * omh2 ** 0.674)
b_2 = 0.238 * omh2 ** 0.223
derived_parameters._z_drag = (
1291.0
* (omh2 ** 0.251)
/ (1.0 + 0.659 * omh2 ** 0.828)
* (1.0 + b_1 * (params.omega_b * params.h ** 2) ** b_2)
) # Redshift at drag epoch, Eq. (4)
# Comoving distance values
derived_parameters._R_drag = self._photon2baryon_dens(
params, derived_parameters, derived_parameters._z_drag
)
derived_parameters._R_eq = self._photon2baryon_dens(
params, derived_parameters, derived_parameters._z_equality
)
if derived_parameters._R_eq == 0:
derived_parameters._sound_horiz = np.nan
else:
derived_parameters._sound_horiz = (
2
/ (3 * derived_parameters._k_eq)
* np.sqrt(6 / derived_parameters._R_eq)
* np.log(
(
np.sqrt(1.0 + derived_parameters._R_drag)
+ np.sqrt(derived_parameters._R_drag + derived_parameters._R_eq)
)
/ (1.0 + np.sqrt(derived_parameters._R_eq))
)
) # Eq. (6)
if derived_parameters.omega_l > 0 and params.wa == 0.0:
derived_parameters.a_eq2 = (
params.omega_m / derived_parameters.omega_l
) ** (1 / 3.0)
derived_parameters.z_eq2 = 1.0 / derived_parameters.a_eq2 - 1.0
else:
derived_parameters.a_eq2 = np.nan
derived_parameters.z_eq2 = np.nan
if self.model_enrich_params is not None:
self.model_enrich_params(params, derived_parameters)
return derived_parameters
def _photon2baryon_dens(self, params, derived_parameters, z):
"""Returns the ratio of the baryon to photon momentum density R
as defined in Eisenstein & Hu, 1998, ApJ, 511, 5, Equation (5)
for redshift z
R = 31.5 omega_b h^2 sigma_27^-4 (z/10^3)^-1"""
R = (
31.5
* params.omega_b
* params.h ** 2
* derived_parameters._sigma_27 ** (-4)
* (z / 10 ** 3) ** (-1.0)
)
return R
[docs] def print_params(self, inc_derived=True, inc_internal=True, file=None):
"""
Prints the parameters of PyCosmo instance.
:param inc_derived: prints derived parameters [True or False]
:param inc_internal: prints internal parameterss (e.g. for lin_pert)
[True or False]
"""
print_ = functools.partial(print, file=file)
INDENT = " "
for section, parameters in INFO.items():
if section.startswith("nonlinear_perturbations:"):
if section != "nonlinear_perturbations:{}".format(
self.params.pk_nonlin_type
):
continue
if section.startswith("internal:") and not inc_internal:
continue
print_("----", (section + " ").ljust(70, "-"))
print_()
for name, extra_text in parameters.items():
full_text = name
if extra_text:
full_text += " ({})".format(extra_text)
lines = textwrap.wrap(full_text, 45)
for line in lines[:-1]:
print_(INDENT + line)
value = self.params[name]
if value is None:
value = "not set"
print_(INDENT + "{:45s}: {}".format(lines[-1], value))
print_()
# Uwe: review if this output is still up to date:
DERIVED_INFO = dict(
_sigma_27="Dimensionless CMB temperature [1]",
_z_equality="Redshift of matter-radiation equlity [1]",
_z_drag="Redshift of drag epoch [1]",
_k_eq="Particle horizon at equality epoch [Mpc-1]",
_sound_horiz="Sound horizon [Mpc-1]",
H0="Hubble constant (H0) [km/s/Mpc]",
rh="Hubble radius (rh) [Mpc/h]",
rho_crit="Critical Density (rho_crit) [h^2 M_sun/Mpc^3]",
omega_dm="Dark Matter density (omega_dm) [1]",
omega_gamma="Photon density (omega_gamma) [1]",
omega_neu="Neutrino density (omega_neu) [1]",
omega_nu_m="Massive neutrino density (Omega_m_nu) [1]",
omega_k="Curvature density (omega_k) [1]",
omega="Total density (omega) [1]",
a_eq="Dark energy-matter equality [1]",
z_eq="Dark energy-matter equality as redshift [1]",
a_eq2="Dark energy-radiation equality [1]",
z_eq2="Dark energy-radiation equality as redshift [1]",
)
if inc_derived:
print_("----", ("Derived quantities").ljust(70, "-"))
print_()
for key, value in self.derived_parameters.items():
extra_text = DERIVED_INFO.get(key)
if extra_text:
key += " ({})".format(extra_text)
lines = textwrap.wrap(key, 45)
for line in lines[:-1]:
print_(INDENT + line)
if value is None:
value = "not set"
print_(INDENT + "{:45s}: {}".format(lines[-1], value))
print_()
if self.model_specific_parameters:
model_name = self.model_config.main.model
print_(
"----",
("Model {} specific quantities".format(model_name)).ljust(70, "-"),
)
print_()
for key, value in self.model_specific_parameters.items():
if value is None:
value = "not set"
print_(INDENT + "{:45s}: {}".format(key, value))
print_()
if hasattr(self, "lin_pert"):
if hasattr(self.lin_pert, "print_params"):
self.lin_pert.print_params()
[docs]class FakeProjection:
def __getattr__(self, *a):
raise ValueError("you must set pk_nonlin_type to access projections")
def __str__(self):
return "None"
def __getstate__(self):
return None
def __setstate__(self, dd):
pass