Source code for cometspec.rates

"""Radiative rate matrix, steady-state solver, g-factors, spectrum synthesis,
and line-spread-function helpers.

Routines
------------
- :func:`build_rate_matrix_nbar` -- Build the radiative rate matrix from normalized transitions.
- :func:`solve_with_normalization` -- Solve the steady-state system ``M @ n = 0`` with ``sum(n) = 1``.
- :func:`solve_with_normalization_fast` -- Fast buffer-reusing version of :func:`solve_with_normalization`.
- :func:`g_factors` -- Compute per-line and total photon/energy g-factors.
- :func:`g_factors_fast_from_cache` -- Fast g-factor evaluation using precomputed arrays.
- :func:`synth_spectrum_from_lines` -- Build a synthetic emission spectrum from line g-factors.
- :func:`make_lsf` -- Create a line-spread function callable from parameter values.
"""
from __future__ import annotations

from typing import Dict, Tuple, Optional, Callable, Any

import numpy as np

from astropy import constants as const
from astropy import units as u
from astropy.table import Table
import warnings
from scipy.linalg import LinAlgWarning, solve
from scipy.signal import fftconvolve



__all__ = [
    "build_rate_matrix_nbar",
    "solve_with_normalization",
    "solve_with_normalization_fast",
    "g_factors",
    "g_factors_fast_from_cache",
    "synth_spectrum_from_lines",
    "make_lsf",
]


def _as_array(obj: Any, name: str) -> np.ndarray:
    """Return a named column as a NumPy array.

    Parameters
    ----------
    obj : Any
        Table-like container with named columns.
    name : str
        Column name.

    Returns
    -------
    numpy.ndarray
        Column values as a NumPy array.
    """
    if hasattr(obj, "colnames"):        # astropy Table
        return np.asarray(obj[name])
    if hasattr(obj, "columns"):         # pandas DataFrame
        return np.asarray(obj[name].values)
    return np.asarray(obj[name])


[docs] def build_rate_matrix_nbar( lines: Table, *, include_stim_emission: bool = False, verbose: bool = True, A_col: str = "A_ul", upper_id_col: str = "upper_id", lower_id_col: str = "lower_id", g_upper_col: str = "g_upper", g_lower_col: str = "g_lower", ): """Build the radiative rate matrix from normalized transitions. Parameters ---------- lines : astropy.table.Table Transition table with solar incident irradiance quantities attached. include_stim_emission : bool, optional, default False Include stimulated emission in downward rates. verbose : bool, optional, default True Print diagnostic information. A_col : str, optional, default "A_ul" Einstein A column name. upper_id_col : str, optional, default "upper_id" Upper-level ID column name. lower_id_col : str, optional, default "lower_id" Lower-level ID column name. g_upper_col : str, optional, default "g_upper" Upper-level degeneracy column name. g_lower_col : str, optional, default "g_lower" Lower-level degeneracy column name. Returns ------- tuple[numpy.ndarray, dict[int, str], astropy.table.Table] Rate matrix, a dictionary that maps each row/column index of ``M`` back to its physical level identifier (the string from ``upper_id_col`` / ``lower_id_col``), and the input line lists with new columns (`__nu_Hz`, `__R_lu`, `__R_ul`, `__upper_idx`, `__lower_idx`). """ lines_out = lines.copy() nu = np.asarray(lines_out["Frequency_Hz"], float) * u.Hz A_ul = np.asarray(lines_out[A_col], float) / u.s Jnu_cgs = np.asarray(lines_out["J_nu_erg_cm2_s_Hz_sr"], float) * (u.erg / (u.cm**2 * u.s * u.Hz * u.sr)) Jnu_SI = Jnu_cgs.to(u.W / (u.m**2 * u.Hz * u.sr)) gu = np.asarray(lines_out[g_upper_col], float) gl = np.asarray(lines_out[g_lower_col], float) B_lu = (A_ul * const.c**2 / (2.0 * const.h * nu**3) * (gu / gl)).decompose().value B_ul = (A_ul * const.c**2 / (2.0 * const.h * nu**3)).decompose().value R_lu = B_lu * Jnu_SI.value R_ul = A_ul.value if include_stim_emission: R_ul = R_ul + B_ul * Jnu_SI.value upper_ids = np.asarray(lines_out[upper_id_col], str) lower_ids = np.asarray(lines_out[lower_id_col], str) level_to_idx: Dict[str, int] = {} upper_idx: list[int] = [] lower_idx: list[int] = [] for u_id, l_id in zip(upper_ids, lower_ids): if u_id not in level_to_idx: level_to_idx[u_id] = len(level_to_idx) if l_id not in level_to_idx: level_to_idx[l_id] = len(level_to_idx) upper_idx.append(level_to_idx[u_id]) lower_idx.append(level_to_idx[l_id]) idx_to_level = {v: k for k, v in level_to_idx.items()} n_levels = len(idx_to_level) M = np.zeros((n_levels, n_levels), float) def add_rate(dest: int, src: int, rate: float): if not np.isfinite(rate) or rate <= 0.0: return M[src, src] -= rate M[dest, src] += rate for iu, il, rlu, rul in zip(upper_idx, lower_idx, R_lu, R_ul): add_rate(iu, il, float(rlu)) add_rate(il, iu, float(rul)) lines_out["__nu_Hz"] = nu.to_value(u.Hz) lines_out["__R_lu"] = np.asarray(R_lu, float) lines_out["__R_ul"] = np.asarray(R_ul, float) lines_out["__upper_idx"] = np.asarray(upper_idx, int) lines_out["__lower_idx"] = np.asarray(lower_idx, int) if verbose: print(f"[diag] N levels: {n_levels} | all finite: {np.isfinite(M).all()}") return M, idx_to_level, lines_out
[docs] def solve_with_normalization(M: np.ndarray, *, verbose: bool = True) -> np.ndarray: """Solve the steady-state system ``Mn = 0`` with ``sum(n) = 1``. Parameters ---------- M : numpy.ndarray Rate matrix. verbose : bool, optional, default True Print solver diagnostics. Returns ------- numpy.ndarray Normalized non-negative level populations. Raises ------ RuntimeError If no positive normalized solution can be formed. Notes ----- If ``np.linalg.lstsq`` fails to converge (e.g. because M contains NaN or Inf from extreme parameter values), a ``LinAlgError`` is caught and an array of NaN is returned. In the MCMC context this propagates to a non-finite model flux so the walker step is rejected (log-likelihood → -inf) rather than crashing. In the direct-modelling context ``best_model`` will contain NaN, signalling that the chosen parameters are unphysical. """ n_levels = M.shape[0] A = M.astype(float) b = np.zeros(n_levels) A[0, :] = 1.0 b[0] = 1.0 try: n, *_ = np.linalg.lstsq(A, b, rcond=None) except np.linalg.LinAlgError: return np.full(n_levels, np.nan) if np.any(n < 0): n = np.clip(n, 0.0, None) s = n.sum() if s <= 0: raise RuntimeError("Degenerate M: no positive steady-state solution.") n /= s if verbose: print("[solver] sum(n) =", n.sum()) return n
[docs] def solve_with_normalization_fast(M, A_work, b_work): """Fast buffer-reusing version of :func:`solve_with_normalization`. Parameters ---------- M : numpy.ndarray Rate matrix. A_work : numpy.ndarray Reusable matrix buffer. b_work : numpy.ndarray Reusable right-hand-side buffer. Returns ------- numpy.ndarray Normalized non-negative level populations. Raises ------ RuntimeError If no positive normalized solution can be formed. Notes ----- The first row of ``M`` is overwritten with ones (and ``b[0] = 1``) to impose ``sum(n) = 1`` directly, so the resulting square system is non-singular for any well-posed rate matrix and is solved with :func:`scipy.linalg.solve` (LU, with ``overwrite_a``/``overwrite_b`` and ``check_finite=False`` to skip the input scan and reuse the work buffers in-place — several times faster than the SVD-based :func:`numpy.linalg.lstsq` used by :func:`solve_with_normalization`). If the matrix is singular (e.g. NaN/Inf collision rates at extreme parameter values), the raised ``LinAlgError`` is caught and an array of NaN is returned. In the MCMC context this propagates to a non-finite model flux so the walker step is rejected (log-likelihood -> -inf) rather than crashing. In the direct-modelling context ``best_model`` will contain NaN, signalling that the chosen parameters are unphysical. """ np.copyto(A_work, M) b_work.fill(0.0) A_work[0, :] = 1.0 b_work[0] = 1.0 # Rank-deficiency / extreme-ill-conditioning screen. LU and lstsq pick # different valid solutions on (near-)singular systems, so route those # to lstsq (minimum-norm) to stay consistent with solve_with_normalization. # Cheap O(N) test: zero diagonals (dead-end levels) or wide dynamic range # in loss rates (effective rank deficiency from huge magnitude spread). diag = np.abs(np.diag(M)) diag_max = diag.max() if diag.size else 0.0 diag_min = diag.min() if diag.size else 0.0 rank_deficient = diag_max > 0.0 and diag_min < 1e-12 * diag_max if rank_deficient: lu_ok = False n = None else: try: with warnings.catch_warnings(): warnings.simplefilter("ignore", LinAlgWarning) n = solve(A_work, b_work, overwrite_a=True, overwrite_b=True, check_finite=False, assume_a='gen') lu_ok = np.all(np.isfinite(n)) if lu_ok: resid = np.linalg.norm(M @ n, ord=np.inf) scale = np.linalg.norm(M, ord=np.inf) * np.linalg.norm(n, ord=np.inf) if scale > 0.0 and resid / scale > 1e-6: lu_ok = False except np.linalg.LinAlgError: lu_ok = False n = None if not lu_ok: A_fb = M.astype(float, copy=True) A_fb[0, :] = 1.0 b_fb = np.zeros(M.shape[0], dtype=float) b_fb[0] = 1.0 try: n, *_ = np.linalg.lstsq(A_fb, b_fb, rcond=None) except np.linalg.LinAlgError: return np.full(M.shape[0], np.nan) if not np.all(np.isfinite(n)): return np.full(M.shape[0], np.nan) n = np.clip(n, 0.0, None) s = n.sum() if s <= 0.0: return np.full(M.shape[0], np.nan) n = n / s return n
[docs] def g_factors( lines_with_rates: Table, n: np.ndarray, *, A_col: str = "A_ul", ): """Compute per-line and total photon/energy g-factors. The g-factors are in units of photons/s/molecule for ``g_phot`` and erg/s/molecule for ``g_energy``. Parameters ---------- lines_with_rates : astropy.table.Table Transition table with cached rate-matrix indices. n : numpy.ndarray Level populations. A_col : str, optional, default "A_ul" Einstein A column name. Returns ------- tuple[numpy.ndarray, numpy.ndarray, float, float] ``(g_phot, g_energy, sum_g_phot, sum_g_energy)``. """ nu = np.asarray(lines_with_rates["__nu_Hz"], float) A_ul = np.asarray(lines_with_rates[A_col], float) ui = np.asarray(lines_with_rates["__upper_idx"], int) nu = np.nan_to_num(nu, 0.0, 0.0, 0.0) A_ul = np.nan_to_num(A_ul, 0.0, 0.0, 0.0) n_u = n[ui] g_ph = n_u * A_ul g_en = const.h.cgs.value * nu * g_ph g_ph = np.nan_to_num(g_ph, 0.0, 0.0, 0.0) g_en = np.nan_to_num(g_en, 0.0, 0.0, 0.0) return g_ph, g_en, float(g_ph.sum()), float(g_en.sum())
[docs] def g_factors_fast_from_cache( *, ui: np.ndarray, A_ul: np.ndarray, hnu: np.ndarray, n: np.ndarray, out_g_ph: Optional[np.ndarray] = None, out_g_en: Optional[np.ndarray] = None, ) -> tuple[np.ndarray, np.ndarray]: """Fast g-factor evaluation using precomputed arrays. Parameters ---------- ui : numpy.ndarray Upper-level indices per line. A_ul : numpy.ndarray Einstein A coefficients per line. hnu : numpy.ndarray ``h * nu`` per line in erg. n : numpy.ndarray Level populations. out_g_ph : numpy.ndarray, optional, default None Optional output buffer for photon g-factors. out_g_en : numpy.ndarray, optional, default None Optional output buffer for energy g-factors. Returns ------- tuple[numpy.ndarray, numpy.ndarray] ``(g_ph, g_en)`` arrays. """ n_u = n[ui] if out_g_ph is None: g_ph = n_u * A_ul else: np.multiply(n_u, A_ul, out=out_g_ph) g_ph = out_g_ph if out_g_en is None: g_en = hnu * g_ph else: np.multiply(hnu, g_ph, out=out_g_en) g_en = out_g_en np.nan_to_num(g_ph, copy=False, nan=0.0, posinf=0.0, neginf=0.0) np.nan_to_num(g_en, copy=False, nan=0.0, posinf=0.0, neginf=0.0) return g_ph, g_en
[docs] def synth_spectrum_from_lines( df_lines: Table, N_col_cm2: float, *, g_line_energy: Optional[np.ndarray] = None, g_line_phot: Optional[np.ndarray] = None, fwhm_A: float = 0.02, dlam_A: float = 0.05, lam_min: Optional[float] = None, lam_max: Optional[float] = None, lam_col: str = "Wave_vac_AA", Omega_sr: Optional[float] = None, grid: Optional[np.ndarray] = None, lsf: Optional[Callable[[np.ndarray], np.ndarray]] = None, v_shift_kms: float = 0.0, dlam_shift_A: float = 0.0, ) -> Tuple[np.ndarray, np.ndarray]: """Build a synthetic emission spectrum from line g-factors. Parameters ---------- df_lines : astropy.table.Table Transition table. **Required** N_col_cm2 : float Column density in cm^-2. **Required** g_line_energy : numpy.ndarray, optional, default None Per-line energy g-factors. If not provided ``g_line_phot`` **must** be used to compute line intensities. g_line_phot : numpy.ndarray, optional, default None Per-line photon g-factors. If not provided, ``g_line_energy`` **must** be used to compute line intensities. fwhm_A : float, optional, default 0.02 Gaussian FWHM in Angstrom if no custom LSF is provided. dlam_A : float, optional, default 0.05 Wavelength step for an auto-generated grid. lam_min : float, optional, default None Optional minimum wavelength (after wavelength shifts) for an auto-generated grid. The minimum grid value will be at least 5 FWHM below the minimum line wavelength. lam_max : float, optional, default None Optional maximum wavelength (after wavelength shifts) for an auto-generated grid. The maximum grid value will be at least 5 FWHM above the maximum line wavelength. lam_col : str, optional, default "Wave_vac_AA". If None, it will be set to "Wave_vac_AA". Wavelength column in ``df_lines``. Omega_sr : float, optional, default None Optional solid angle scaling in sr. grid : numpy.ndarray, optional, default None Optional output wavelength grid. lsf : Callable[[numpy.ndarray], numpy.ndarray], optional, default None Optional custom line-spread function. v_shift_kms : float, optional, default 0.0 Velocity shift in km/s for the synthetic spectrum lines. dlam_shift_A : float, optional, default 0.0 Additive wavelength shift in Angstrom for the synthetic spectrum lines. Returns ------- tuple[numpy.ndarray, numpy.ndarray] ``(wavelength_grid, synthetic_flux)``. Raises ------ ValueError If required inputs are missing. """ if lam_col not in df_lines.colnames: raise ValueError(f"{lam_col!r} not found in df_lines.") lam_rest = np.asarray(df_lines[lam_col], float) if v_shift_kms is not None: c_kms = const.c.to("km/s").value lam = lam_rest * (1.0 + v_shift_kms / c_kms) if dlam_shift_A != 0.0: lam = lam + dlam_shift_A else: lam = lam_rest + dlam_shift_A if g_line_energy is not None: I_line = np.asarray(g_line_energy, float) elif g_line_phot is not None: if "__nu_Hz" in df_lines.colnames: nu = np.asarray(df_lines["__nu_Hz"], float) else: nu = (const.c / (lam * u.AA)).to_value(u.Hz) I_line = const.h.cgs.value * nu * np.asarray(g_line_phot, float) else: raise ValueError("Provide g_line_energy or g_line_phot.") m = np.isfinite(lam) & np.isfinite(I_line) & (I_line > 0.0) lam = lam[m] I_line = I_line[m] if lam.size == 0: if grid is None: return np.array([]), np.array([]) return np.asarray(grid, float), np.zeros_like(grid, float) if lam_min is None: lam_min = float(lam.min() - 5.0 * fwhm_A) if lam_max is None: lam_max = float(lam.max() + 5.0 * fwhm_A) if grid is None: if dlam_A <= 0.0: dlam_A = fwhm_A / 3.0 ngrid = int(np.ceil((lam_max - lam_min) / dlam_A)) + 1 grid = lam_min + np.arange(ngrid, dtype=float) * dlam_A else: grid = np.asarray(grid, float) if lsf is None: sigma = fwhm_A / (2.0 * np.sqrt(2.0 * np.log(2.0))) norm = sigma * np.sqrt(2.0 * np.pi) spec_per_mol = np.zeros_like(grid, dtype=float) for l0, I0 in zip(lam, I_line): spec_per_mol += I0 * np.exp(-0.5 * ((grid - l0) / sigma) ** 2) / norm else: # Stick-and-convolve fast path for translation-invariant LSFs on a # uniform grid: build a delta-stick spectrum (each line linearly # distributed into its two nearest bins), then FFT-convolve once with # a single LSF kernel. This replaces the O(N_grid * N_lines) dense # outer product with O(N_grid log N_grid + N_lines), which is the # dominant per-step cost in the MCMC inner loop when the line list is # large. Falls back to the dense path when the grid is non-uniform. N_grid = grid.size if N_grid >= 2: dlam_grid = float(grid[1] - grid[0]) uniform = ( dlam_grid > 0.0 and np.allclose(np.diff(grid), dlam_grid, rtol=1e-6, atol=0.0) ) else: uniform = False if uniform: x = (lam - grid[0]) / dlam_grid idx_lo = np.floor(x).astype(np.intp) frac = x - idx_lo m_lo = (idx_lo >= 0) & (idx_lo < N_grid) m_hi = (idx_lo + 1 >= 0) & (idx_lo + 1 < N_grid) sticks = np.zeros(N_grid, dtype=float) if m_lo.any(): sticks += np.bincount( idx_lo[m_lo], weights=((1.0 - frac) * I_line)[m_lo], minlength=N_grid, ) if m_hi.any(): sticks += np.bincount( idx_lo[m_hi] + 1, weights=(frac * I_line)[m_hi], minlength=N_grid, ) M_k = N_grid if (N_grid % 2 == 1) else (N_grid - 1) K = M_k // 2 dl_kernel = (np.arange(M_k) - K) * dlam_grid kernel = lsf(dl_kernel) spec_per_mol = fftconvolve(sticks, kernel, mode="same") else: dl = grid[:, None] - lam[None, :] prof = lsf(dl) spec_per_mol = (prof * I_line).sum(axis=1) I_lambda = (N_col_cm2 / (4.0 * np.pi)) * spec_per_mol if Omega_sr is not None: return grid, I_lambda * Omega_sr else: return grid, I_lambda
# ============================================================================= # LSF helper # ============================================================================= class _Gauss2LSF: """Two-Gaussian line-spread function: ``ratio * G(sigma1) + (1-ratio) * G(sigma2)``.""" def __init__(self, sigma1: float, sigma2: float, ratio: float): self.sigma1 = float(sigma1) self.sigma2 = float(sigma2) self.ratio = float(ratio) def __call__(self, dl: np.ndarray) -> np.ndarray: g1 = np.exp(-0.5 * (dl / self.sigma1) ** 2) / (self.sigma1 * np.sqrt(2.0 * np.pi)) g2 = np.exp(-0.5 * (dl / self.sigma2) ** 2) / (self.sigma2 * np.sqrt(2.0 * np.pi)) return self.ratio * g1 + (1.0 - self.ratio) * g2 class _GaussLSF: """Single-Gaussian line-spread function with width ``sigma``.""" def __init__(self, sigma: float): self.sigma = float(sigma) def __call__(self, dl: np.ndarray) -> np.ndarray: return np.exp(-0.5 * (dl / self.sigma) ** 2) / (self.sigma * np.sqrt(2.0 * np.pi)) class _GaussLorentzLSF: """Gaussian + Lorentzian mixture: ``ratio * G(sigma_G) + (1-ratio) * L(fwhm_L)``.""" def __init__(self, sigma_G: float, fwhm_L: float, ratio: float): self.sigma_G = float(sigma_G) self.fwhm_L = float(fwhm_L) self.ratio = float(ratio) def __call__(self, dl: np.ndarray) -> np.ndarray: gamma = self.fwhm_L / 2.0 A = 2.0 / (np.pi * self.fwhm_L) gauss = np.exp(-0.5 * (dl / self.sigma_G) ** 2) / (self.sigma_G * np.sqrt(2.0 * np.pi)) lorentz = A * gamma**2 / (gamma**2 + dl**2) return self.ratio * gauss + (1.0 - self.ratio) * lorentz class _LorentzLSF: """Lorentzian line-spread function with full width at half max ``fwhm_L``.""" def __init__(self, fwhm_L: float): self.fwhm_L = float(fwhm_L) def __call__(self, dl: np.ndarray) -> np.ndarray: gamma = self.fwhm_L / 2.0 A = 2.0 / (np.pi * self.fwhm_L) return A * gamma**2 / (gamma**2 + dl**2)
[docs] def make_lsf(params: Dict[str, float], mode: str) -> Optional[Callable[[np.ndarray], np.ndarray]]: """Create a line-spread function callable from parameter values. - If LSF mode == "2Gauss", the parameters are ``sigma1``, ``sigma2``, and ``ratio``. - If LSF mode == "Gauss", the parameter is ``sigma``. - If LSF mode == "Gauss_Lorentz", the parameters are ``sigma_G``, ``fwhm_L``, and ``ratio``. - If LSF mode == "Lorentz", the parameter is ``fwhm_L``. Parameters ---------- params : dict[str, float] LSF parameter dictionary. mode : str LSF mode: ``2Gauss``, ``Gauss``, ``Gauss_Lorentz``, or ``Lorentz``. Returns ------- Callable[[numpy.ndarray], numpy.ndarray] LSF callable. Returned as a small picklable class instance (not a closure), so it can be sent to ``spawn``-based multiprocessing workers when used as the ``lsf`` argument to :func:`cometspec.mcmc.mcmc_fitting`. Raises ------ ValueError If ``mode`` is invalid. """ if mode == "2Gauss": return _Gauss2LSF( sigma1=params.get("sigma1", 0.01), sigma2=params.get("sigma2", 0.005), ratio=params.get("ratio", 0.9), ) if mode == "Gauss": return _GaussLSF(sigma=params.get("sigma", 0.01)) if mode == "Gauss_Lorentz": return _GaussLorentzLSF( sigma_G=params.get("sigma_G", 0.01), fwhm_L=params.get("fwhm_L", 0.02), ratio=params.get("ratio", 0.9), ) if mode == "Lorentz": return _LorentzLSF(fwhm_L=params.get("fwhm_L", 0.02)) raise ValueError("Invalid LSF mode.")