Source code for celerite2.terms

# -*- coding: utf-8 -*-

__all__ = [
    "Term",
    "TermSum",
    "TermProduct",
    "TermDiff",
    "TermConvolution",
    "RealTerm",
    "ComplexTerm",
    "SHOTerm",
    "Matern32Term",
    "RotationTerm",
    "OriginalCeleriteTerm",
]

from functools import wraps
from itertools import chain, product

import numpy as np

import celerite2.driver as driver


[docs] class Term: """The abstract base "term" that is the superclass of all other terms Subclasses should overload at least the :func:`Term.get_coefficients` method. """ def __add__(self, b): return TermSum(self, b) def __mul__(self, b): return TermProduct(self, b) @property def terms(self): return [self]
[docs] def get_coefficients(self): """Compute and return the coefficients for the celerite model This should return a 6 element tuple with the following entries: .. code-block:: python (ar, cr, ac, bc, cc, dc) .. note:: All of the returned objects must be arrays, even if they only have one element. """ raise NotImplementedError("subclasses must implement this method")
[docs] def get_value(self, tau): """Compute the value of the kernel as a function of lag Args: tau (shape[...]): The lags where the kernel should be evaluated. """ tau = np.abs(np.atleast_1d(tau)) ar, cr, ac, bc, cc, dc = self.get_coefficients() k = np.zeros_like(tau) tau = tau[..., None] if len(ar): k += np.sum(ar * np.exp(-cr * tau), axis=-1) if len(ac): arg = dc * tau k += np.sum( np.exp(-cc * tau) * (ac * np.cos(arg) + bc * np.sin(arg)), axis=-1, ) return k
[docs] def get_psd(self, omega): """Compute the value of the power spectral density for this process Args: omega (shape[...]): The (angular) frequencies where the power should be evaluated. """ w2 = np.atleast_1d(omega) ** 2 ar, cr, ac, bc, cc, dc = self.get_coefficients() psd = np.zeros_like(w2) w2 = w2[..., None] if len(ar): psd += np.sum(ar * cr / (cr**2 + w2), axis=-1) if len(ac): w02 = cc**2 + dc**2 psd += np.sum( ((ac * cc + bc * dc) * w02 + (ac * cc - bc * dc) * w2) / (w2**2 + 2.0 * (cc * cc - dc * dc) * w2 + w02 * w02), axis=-1, ) return np.sqrt(2 / np.pi) * psd
[docs] def to_dense(self, x, diag): """Evaluate the dense covariance matrix for this term Args: x (shape[N]): The independent coordinates of the data. diag (shape[N]): The diagonal variance of the system. """ K = self.get_value(x[:, None] - x[None, :]) K[np.diag_indices_from(K)] += diag return K
[docs] def get_celerite_matrices( self, x, diag, *, c=None, a=None, U=None, V=None ): """Get the matrices needed to solve the celerite system Pre-allocated arrays can be provided to the Python interface to be re-used for multiple evaluations. .. note:: In-place operations are not supported by the modeling extensions. Args: x (shape[N]): The independent coordinates of the data. diag (shape[N]): The diagonal variance of the system. a (shape[N], optional): The diagonal of the A matrix. U (shape[N, J], optional): The first low-rank matrix. V (shape[N, J], optional): The second low-rank matrix. P (shape[N-1, J], optional): The regularization matrix used for numerical stability. Raises: ValueError: When the inputs are not valid. """ x = np.atleast_1d(x) diag = np.atleast_1d(diag) if len(x.shape) != 1: raise ValueError("'x' must be one-dimensional") if x.shape != diag.shape: raise ValueError("dimension mismatch") ar, cr, ac, bc, cc, dc = self.get_coefficients() N = len(x) Jr = len(ar) Jc = len(ac) J = Jr + 2 * Jc if c is None: c = np.empty(J) else: c.resize(J, refcheck=False) if a is None: a = np.empty(N) else: a.resize(N, refcheck=False) if U is None: U = np.empty((N, J)) else: U.resize((N, J), refcheck=False) if V is None: V = np.empty((N, J)) else: V.resize((N, J), refcheck=False) c[:Jr] = cr c[Jr::2] = cc c[Jr + 1 :: 2] = cc a, U, V = driver.get_celerite_matrices( ar, ac, bc, dc, x, diag, a, U, V ) return c, a, U, V
[docs] def dot(self, x, diag, y): """Apply a matrix-vector or matrix-matrix product Args: x (shape[N]): The independent coordinates of the data. diag (shape[N]): The diagonal variance of the system. y (shape[N] or shape[N, K]): The target of vector or matrix for this operation. """ x = np.atleast_1d(x) y = np.atleast_1d(y) if y.shape[0] != x.shape[0]: raise ValueError("Dimension mismatch") is_vector = False if y.ndim == 1: is_vector = True y = y[:, None] if y.ndim != 2: raise ValueError("'y' can only be a vector or matrix") c, a, U, V = self.get_celerite_matrices(x, diag) z = y * a[:, None] z = driver.matmul_lower(x, c, U, V, y, z) z = driver.matmul_upper(x, c, U, V, y, z) if is_vector: return z[:, 0] return z
[docs] class TermSum(Term): """A sum of multiple :class:`Term` objects The resulting kernel function is the sum of the functions and the width of the resulting low-rank representation will be the sum of widths for each of the terms. Args: *terms: Any number of :class:`Term` subclasses to add together. """ def __init__(self, *terms): if any(isinstance(term, TermConvolution) for term in terms): raise TypeError( "You cannot perform operations on an TermConvolution, it must " "be the outer term in the kernel" ) self._terms = terms @property def terms(self): return self._terms def get_coefficients(self): coeffs = (t.get_coefficients() for t in self.terms) return tuple(np.concatenate(c) for c in zip(*coeffs))
[docs] class TermProduct(Term): """A product of two :class:`Term` objects The resulting kernel function is the product of the two functions and the resulting low-rank representation will in general be wider than the sum of the two widths. Args: term1 (Term): The left term. term2 (Term): The right term. """ def __init__(self, term1, term2): int1 = isinstance(term1, TermConvolution) int2 = isinstance(term2, TermConvolution) if int1 or int2: raise TypeError( "You cannot perform operations on an TermConvolution, it must " "be the outer term in the kernel" ) self.term1 = term1 self.term2 = term2 def get_coefficients(self): c1 = self.term1.get_coefficients() c2 = self.term2.get_coefficients() # First compute real terms ar = [] cr = [] gen = product(zip(c1[0], c1[1]), zip(c2[0], c2[1])) for i, ((aj, cj), (ak, ck)) in enumerate(gen): ar.append(aj * ak) cr.append(cj + ck) # Then the complex terms ac = [] bc = [] cc = [] dc = [] # real * complex gen = product(zip(c1[0], c1[1]), zip(*(c2[2:]))) gen = chain(gen, product(zip(c2[0], c2[1]), zip(*(c1[2:])))) for i, ((aj, cj), (ak, bk, ck, dk)) in enumerate(gen): ac.append(aj * ak) bc.append(aj * bk) cc.append(cj + ck) dc.append(dk) # complex * complex gen = product(zip(*(c1[2:])), zip(*(c2[2:]))) for i, ((aj, bj, cj, dj), (ak, bk, ck, dk)) in enumerate(gen): ac.append(0.5 * (aj * ak + bj * bk)) bc.append(0.5 * (bj * ak - aj * bk)) cc.append(cj + ck) dc.append(dj - dk) ac.append(0.5 * (aj * ak - bj * bk)) bc.append(0.5 * (bj * ak + aj * bk)) cc.append(cj + ck) dc.append(dj + dk) return list(map(np.array, (ar, cr, ac, bc, cc, dc)))
[docs] class TermDiff(Term): """Take the first derivative of a term with respect to the lag Args: term (Term): The term to differentiate. """ def __init__(self, term): if isinstance(term, TermConvolution): raise TypeError( "You cannot perform operations on an TermConvolution, it must " "be the outer term in the kernel" ) self.term = term def get_coefficients(self): coeffs = self.term.get_coefficients() a, b, c, d = coeffs[2:] final_coeffs = [ -coeffs[0] * coeffs[1] ** 2, coeffs[1], a * (d**2 - c**2) + 2 * b * c * d, b * (d**2 - c**2) - 2 * a * c * d, c, d, ] return final_coeffs
[docs] class TermConvolution(Term): """A term corresponding to the integral of another term over a boxcar The process produced by this term is equivalent to the process produced by convolving the base process with a boxcar of length ``delta``. This can be useful, for example, when taking exposure time integration into account. Args: term (Term): The term describing the base process. delta (float): The width of the boxcar filter (for example, the exposure time). """ def __init__(self, term, delta): self.term = term self.delta = float(delta) def get_celerite_matrices( self, x, diag, *, c=None, a=None, U=None, V=None ): dt = self.delta ar, cr, a, b, cc, d = self.term.get_coefficients() # Real part cd = cr * dt delta_diag = 2 * np.sum(ar * (cd - np.sinh(cd)) / cd**2) # Complex part cd = cc * dt dd = d * dt c2 = cc**2 d2 = d**2 c2pd2 = c2 + d2 C1 = a * (c2 - d2) + 2 * b * cc * d C2 = b * (c2 - d2) - 2 * a * cc * d norm = (dt * c2pd2) ** 2 sinh = np.sinh(cd) cosh = np.cosh(cd) delta_diag += 2 * np.sum( ( C2 * cosh * np.sin(dd) - C1 * sinh * np.cos(dd) + (a * cc + b * d) * dt * c2pd2 ) / norm ) new_diag = diag + delta_diag return super().get_celerite_matrices(x, new_diag, c=c, a=a, U=U, V=V) def get_coefficients(self): ar, cr, a, b, c, d = self.term.get_coefficients() # Real componenets crd = cr * self.delta coeffs = [2 * ar * (np.cosh(crd) - 1) / crd**2, cr] # Imaginary coefficients cd = c * self.delta dd = d * self.delta c2 = c**2 d2 = d**2 factor = 2.0 / (self.delta * (c2 + d2)) ** 2 cos_term = np.cosh(cd) * np.cos(dd) - 1 sin_term = np.sinh(cd) * np.sin(dd) C1 = a * (c2 - d2) + 2 * b * c * d C2 = b * (c2 - d2) - 2 * a * c * d coeffs += [ factor * (C1 * cos_term - C2 * sin_term), factor * (C2 * cos_term + C1 * sin_term), c, d, ] return coeffs def get_psd(self, omega): omega = np.atleast_1d(omega) psd0 = self.term.get_psd(omega) arg = 0.5 * self.delta * omega sinc = np.ones_like(arg) m = np.abs(arg) > 0.0 sinc[m] = np.sin(arg[m]) / arg[m] return psd0 * sinc**2 def get_value(self, tau0): dt = self.delta ar, cr, a, b, c, d = self.term.get_coefficients() # Format the lags correctly tau0 = np.abs(np.atleast_1d(tau0)) tau = tau0[..., None] # Precompute some factors dpt = dt + tau dmt = dt - tau # Real parts: # tau > Delta crd = cr * dt cosh = np.cosh(crd) norm = 2 * ar / crd**2 K_large = np.sum(norm * (cosh - 1) * np.exp(-cr * tau), axis=-1) # tau < Delta crdmt = cr * dmt K_small = K_large + np.sum(norm * (crdmt - np.sinh(crdmt)), axis=-1) # Complex part cd = c * dt dd = d * dt c2 = c**2 d2 = d**2 c2pd2 = c2 + d2 C1 = a * (c2 - d2) + 2 * b * c * d C2 = b * (c2 - d2) - 2 * a * c * d norm = 1.0 / (dt * c2pd2) ** 2 k0 = np.exp(-c * tau) cdt = np.cos(d * tau) sdt = np.sin(d * tau) # For tau > Delta cos_term = 2 * (np.cosh(cd) * np.cos(dd) - 1) sin_term = 2 * (np.sinh(cd) * np.sin(dd)) factor = k0 * norm K_large += np.sum( (C1 * cos_term - C2 * sin_term) * factor * cdt, axis=-1 ) K_large += np.sum( (C2 * cos_term + C1 * sin_term) * factor * sdt, axis=-1 ) # tau < Delta edmt = np.exp(-c * dmt) edpt = np.exp(-c * dpt) cos_term = ( edmt * np.cos(d * dmt) + edpt * np.cos(d * dpt) - 2 * k0 * cdt ) sin_term = ( edmt * np.sin(d * dmt) + edpt * np.sin(d * dpt) - 2 * k0 * sdt ) K_small += np.sum(2 * (a * c + b * d) * c2pd2 * dmt * norm, axis=-1) K_small += np.sum((C1 * cos_term + C2 * sin_term) * norm, axis=-1) mask = tau0 >= dt K_small[mask] = K_large[mask] return K_small
[docs] class RealTerm(Term): r"""The simplest celerite term .. warning:: You should only use this term if you really know what you're doing because it will generally behave poorly. Check out :class:`SHOTerm` instead! This term has the form .. math:: k(\tau) = a_j\,e^{-c_j\,\tau} with the parameters ``a`` and ``c``. Strictly speaking, for a sum of terms, the parameter ``a`` could be allowed to go negative but since it is somewhat subtle to ensure positive definiteness, we recommend keeping both parameters strictly positive. Advanced users can build a custom term that has negative coefficients but care should be taken to ensure positivity. Args: a: The amplitude of the term. c: The exponent of the term. """ @staticmethod def get_test_parameters(): return dict(a=1.5, c=0.7) def __init__(self, *, a, c): self.a = float(a) self.c = float(c) def get_coefficients(self): e = np.empty(0) return np.array([self.a]), np.array([self.c]), e, e, e, e
[docs] class ComplexTerm(Term): r"""A general celerite term .. warning:: You should only use this term if you really know what you're doing because it will generally behave poorly. Check out :class:`SHOTerm` instead! This term has the form .. math:: k(\tau) = \frac{1}{2}\,\left[(a_j + b_j)\,e^{-(c_j+d_j)\,\tau} + (a_j - b_j)\,e^{-(c_j-d_j)\,\tau}\right] with the parameters ``a``, ``b``, ``c``, and ``d``. This term will only correspond to a positive definite kernel (on its own) if :math:`a_j\,c_j \ge b_j\,d_j`. Args: a: The real part of amplitude. b: The imaginary part of amplitude. c: The real part of the exponent. d: The imaginary part of exponent. """ @staticmethod def get_test_parameters(): return dict(a=1.5, b=0.7, c=0.7, d=0.5) def __init__(self, *, a, b, c, d): self.a = float(a) self.b = float(b) self.c = float(c) self.d = float(d) def get_coefficients(self): e = np.empty(0) return ( e, e, np.array([self.a]), np.array([self.b]), np.array([self.c]), np.array([self.d]), )
class handle_parameter_spec: def __init__(self, mapper): self.mapper = mapper def __call__(self, to_wrap): @wraps(to_wrap) def wrapped(target, *args, **kwargs): for param, alt in target.__parameter_spec__: all_names = set([param]) | set(name for name, _ in alt) if sum(int(n in kwargs) for n in all_names) != 1: raise ValueError( "exactly one of {0} must be defined".format( list(all_names) ) ) if param in kwargs: setattr(target, param, self.mapper(kwargs.pop(param))) else: for name, func in alt: if name in kwargs: setattr( target, param, func(target, self.mapper(kwargs.pop(name))), ) break return to_wrap(target, *args, **kwargs) return wrapped
[docs] class SHOTerm(Term): r"""A term representing a stochastically-driven, damped harmonic oscillator The PSD of this term is .. math:: S(\omega) = \sqrt{\frac{2}{\pi}} \frac{S_0\,\omega_0^4} {(\omega^2-{\omega_0}^2)^2 + {\omega_0}^2\,\omega^2/Q^2} with the parameters ``S0``, ``Q``, and ``w0``. Consult the `Celerite paper <https://doi.org/10.3847/1538-3881/aa9332>`_ or `its preprint on arXiv <https://arxiv.org/abs/1703.09710>`_ for a more detailed mathematical description. This implementation also supports the following reparameterizations that can be easier to use and reason about: 1. ``rho``, the undamped period of the oscillator, defined as :math:`\rho = 2\,\pi / \omega_0`, 2. ``tau``, the damping timescale of the process, defined as :math:`\tau = 2\,Q / \omega_0`, and 3. ``sigma``, the standard deviation of the process, defined as :math:`\sigma = \sqrt{S_0\,\omega_0\,Q}`. Args: w0: The undamped angular frequency, :math:`\omega_0` above. rho: Alternative parameterization for ``w0`` as described above. Q: The quality factor, :math:`Q` above. tau: Alternative parameterization for ``Q`` as described above. S0: :math:`S_0` in the PSD above. The power at :math:`\omega = 0` is :math:`S_0\sqrt{2/\pi}`. sigma: Alternative parameterization for ``S0`` as described above. eps (optional): A regularization parameter used for numerical stability when computing :math:`\sqrt{1-4\,Q^2}` or :math:`\sqrt{4\,Q^2-1}`. """ __parameter_spec__ = ( ("w0", (("rho", lambda self, rho: 2 * np.pi / rho),)), ("Q", (("tau", lambda self, tau: 0.5 * self.w0 * tau),)), ( "S0", (("sigma", lambda self, sigma: sigma**2 / (self.w0 * self.Q)),), ), ) @staticmethod def get_test_parameters(): return dict(sigma=1.5, tau=2.345, rho=3.4) @handle_parameter_spec(float) def __init__(self, *, eps=1e-5): self.eps = float(eps) def overdamped(self): Q = self.Q f = np.sqrt(np.maximum(1.0 - 4.0 * Q**2, self.eps)) e = np.empty(0) return ( 0.5 * self.S0 * self.w0 * Q * np.array([1.0 + 1.0 / f, 1.0 - 1.0 / f]), 0.5 * self.w0 / Q * np.array([1.0 - f, 1.0 + f]), e, e, e, e, ) def underdamped(self): Q = self.Q f = np.sqrt(np.maximum(4.0 * Q**2 - 1.0, self.eps)) a = self.S0 * self.w0 * Q c = 0.5 * self.w0 / Q e = np.empty(0) return ( e, e, np.array([a]), np.array([a / f]), np.array([c]), np.array([c * f]), ) def get_coefficients(self): return self.overdamped() if self.Q < 0.5 else self.underdamped()
[docs] class Matern32Term(Term): r"""A term that approximates a Matern-3/2 function This term is defined as .. math:: k(\tau) = \sigma^2\,\left[ \left(1+1/\epsilon\right)\,e^{-(1-\epsilon)\sqrt{3}\,\tau/\rho} \left(1-1/\epsilon\right)\,e^{-(1+\epsilon)\sqrt{3}\,\tau/\rho} \right] with the parameters ``sigma`` and ``rho``. The parameter ``eps`` controls the quality of the approximation since, in the limit :math:`\epsilon \to 0` this becomes the Matern-3/2 function .. math:: \lim_{\epsilon \to 0} k(\tau) = \sigma^2\,\left(1+ \frac{\sqrt{3}\,\tau}{\rho}\right)\, \exp\left(-\frac{\sqrt{3}\,\tau}{\rho}\right) This term should be used with care since there could be numerical issues with this approximation. Args: sigma: The parameter :math:`\sigma`. rho: The parameter :math:`\rho`. eps (optional): The value of the parameter :math:`\epsilon`. """ @staticmethod def get_test_parameters(): return dict(sigma=1.5, rho=2.345) def __init__(self, *, sigma, rho, eps=0.01): self.sigma = float(sigma) self.rho = float(rho) self.eps = float(eps) def get_coefficients(self): w0 = np.sqrt(3) / self.rho S0 = self.sigma**2 / w0 e = np.empty(0) return ( e, e, np.array([w0 * S0]), np.array([w0**2 * S0 / self.eps]), np.array([w0]), np.array([self.eps]), )
[docs] class RotationTerm(TermSum): r"""A mixture of two SHO terms that can be used to model stellar rotation This term has two modes in Fourier space: one at ``period`` and one at ``0.5 * period``. This can be a good descriptive model for a wide range of stochastic variability in stellar time series from rotation to pulsations. More precisely, the parameters of the two :class:`SHOTerm` terms are defined as .. math:: Q_1 = 1/2 + Q_0 + \delta Q \\ \omega_1 = \frac{4\,\pi\,Q_1}{P\,\sqrt{4\,Q_1^2 - 1}} \\ S_1 = \frac{\sigma^2}{(1 + f)\,\omega_1\,Q_1} for the primary term, and .. math:: Q_2 = 1/2 + Q_0 \\ \omega_2 = \frac{8\,\pi\,Q_1}{P\,\sqrt{4\,Q_1^2 - 1}} \\ S_2 = \frac{f\,\sigma^2}{(1 + f)\,\omega_2\,Q_2} for the secondary term. Args: sigma: The standard deviation of the process. period: The primary period of variability. Q0: The quality factor (or really the quality factor minus one half; this keeps the system underdamped) for the secondary oscillation. dQ: The difference between the quality factors of the first and the second modes. This parameterization (if ``dQ > 0``) ensures that the primary mode alway has higher quality. f: The fractional amplitude of the secondary mode compared to the primary. This should probably always be ``0 < f < 1``, but that is not enforced. """ @staticmethod def get_test_parameters(): return dict(sigma=1.5, period=3.45, Q0=1.3, dQ=1.05, f=0.5) def __init__(self, *, sigma, period, Q0, dQ, f): self.sigma = float(sigma) self.period = float(period) self.Q0 = float(Q0) self.dQ = float(dQ) self.f = float(f) self.amp = self.sigma**2 / (1 + self.f) # One term with a period of period Q1 = 0.5 + self.Q0 + self.dQ w1 = 4 * np.pi * Q1 / (self.period * np.sqrt(4 * Q1**2 - 1)) S1 = self.amp / (w1 * Q1) # Another term at half the period Q2 = 0.5 + self.Q0 w2 = 8 * np.pi * Q2 / (self.period * np.sqrt(4 * Q2**2 - 1)) S2 = self.f * self.amp / (w2 * Q2) super().__init__( SHOTerm(S0=S1, w0=w1, Q=Q1), SHOTerm(S0=S2, w0=w2, Q=Q2) )
[docs] class OriginalCeleriteTerm(Term): """A wrapper around terms defined using the original celerite package Args: term (celerite.terms.Term): The term defined using ``celerite``. """ def __init__(self, term): self.term = term def get_coefficients(self): return self.term.get_all_coefficients()