Source code for cometspec.collisions

"""Rotational-collision scaffolds and in-place collision application.

Routines
------------
- :func:`canonical_diatomic_name` -- canonicalize diatomic isotopologue labels.
- :func:`is_homonuclear_diatomic` -- detect homonuclear diatomics.
- :func:`is_atomic_species` -- detect atomic (non-diatomic) species such as Fe.
- :func:`diatomic_symmetry_class` -- classify diatomics for collision selection rules.
- :func:`precompute_collision_scaffold` -- build a collision scaffold from explicit lower-state columns in the transition table.
- :func:`precompute_collision_scaffold_fast` -- same as above but with cached arrays for fast updates.
- :func:`apply_collisions_inplace` -- apply collision rates to a matrix in place using the scaffold.
- :func:`apply_collisions_inplace_fast` -- same as above but using cached arrays and reusable buffers for speed.
"""

from __future__ import annotations

from typing import Dict, Sequence, Any

import re

import numpy as np

from astropy import constants as const
from astropy import units as u


__all__ = [
    "canonical_diatomic_name",
    "is_homonuclear_diatomic",
    "is_atomic_species",
    "diatomic_symmetry_class",
    "precompute_collision_scaffold",
    "precompute_collision_scaffold_fast",
    "apply_collisions_inplace",
    "apply_collisions_inplace_fast",
]


#: Planck constant times speed of light over Boltzmann constant, in K*cm.
HC_OVER_K_B_KCM: float = (const.h * const.c / const.k_B).to_value(u.K * u.cm)


def _empty_scaffold() -> dict:
    return dict(iu=np.array([], int), il=np.array([], int),
                gu=np.array([]), gl=np.array([]), dE=np.array([]))


# (mass_number, element) for common astrophysical isotopes with nonzero nuclear
# spin. Used to detect whether an isotopologue has hyperfine structure, which
# opens rotationally-elastic (Delta J = 0) collisional channels between
# hyperfine sublevels. Anything not in this set is assumed I=0.
_NONZERO_SPIN_ISOTOPES: set[tuple[int, str]] = {
    (1, "H"), (2, "H"),
    (13, "C"),
    (14, "N"), (15, "N"),
    (17, "O"),
    (19, "F"),
    (33, "S"),
}


def _has_nonzero_nuclear_spin(iso_name: str | None) -> bool:
    if iso_name is None:
        return False
    atoms = _parse_isotopologue_atoms(iso_name)
    return any(a in _NONZERO_SPIN_ISOTOPES for a in atoms)


[docs] def canonical_diatomic_name(iso_name: str | None) -> str | None: """Return a canonical label for a diatomic isotopologue. Atoms are sorted by (mass, element) so that labels like ``"13C12C"`` and ``"12C13C"`` collapse to the same canonical form (``"12C13C"``). Homonuclear labels with subscripts (e.g. ``"12C2"``) are preserved. If the label cannot be parsed as a diatomic, it is returned unchanged. Parameters ---------- iso_name : str, optional Isotopologue label, or ``None``. Returns ------- str or None Canonicalized isotopologue label, or ``None`` if the input was ``None`` """ if iso_name is None: return None atoms = _parse_isotopologue_atoms(iso_name) if len(atoms) != 2: return iso_name (m1, e1), (m2, e2) = atoms if (m1, e1) == (m2, e2): return f"{m1}{e1}2" a, b = sorted(atoms, key=lambda x: (x[0], x[1])) return f"{a[0]}{a[1]}{b[0]}{b[1]}"
def _parse_isotopologue_atoms(name: str) -> list[tuple[int, str]]: """Parse an isotopologue label like ``"12C2"``, ``"12C13C"``, or ``"12C14N"`` into atoms. Parameters ---------- name : str Isotopologue label using ``<mass><Element>[count]`` tokens (e.g., ``"12C2"`` -> two ``(12, "C")``; ``"12C13C"`` -> ``(12, "C")`` and ``(13, "C")``; ``"12C14N"`` -> ``(12, "C")`` and ``(14, "N")``). Returns ------- list[tuple[int, str]] List of ``(mass_number, element)`` tuples. Empty if ``name`` is unparseable. """ s = str(name) matches = list(re.finditer(r"(\d+)([A-Z][a-z]?)", s)) atoms: list[tuple[int, str]] = [] for i, m in enumerate(matches): mass = int(m.group(1)) elem = m.group(2) end = m.end() next_start = matches[i + 1].start() if i + 1 < len(matches) else len(s) count_match = re.match(r"\d+", s[end:next_start]) count = int(count_match.group()) if count_match else 1 atoms.extend([(mass, elem)] * count) return atoms
[docs] def is_homonuclear_diatomic(iso_name: str | None) -> bool: """Detect whether an isotopologue label denotes a homonuclear diatomic. Homonuclear means the same element AND the same isotope on both atoms (e.g., ``"12C2"``, ``"13C2"``). The mixed-isotope ``"12C13C"`` is treated as heteronuclear because the broken nuclear-permutation symmetry removes the even/odd-J restriction. Parameters ---------- iso_name : str, optional Isotopologue label, or ``None``. Returns ------- bool ``True`` only for diatomics with two identical ``(mass, element)`` atoms; ``False`` otherwise (including unparseable or non-diatomic labels). """ if iso_name is None: return False atoms = _parse_isotopologue_atoms(iso_name) if len(atoms) != 2: return False return atoms[0] == atoms[1]
[docs] def is_atomic_species(iso_name: str | None) -> bool: """Detect whether an isotopologue label denotes an atomic (non-diatomic) species. Atomic species have no rotational structure and therefore no rotational collision channels in this framework. Labels containing ``"Fe"`` are treated as atomic (matching the packaged default Fe line list). Any label that parses to a single atom (e.g., ``"56Fe"``) is also classified as atomic. Parameters ---------- iso_name : str, optional Isotopologue label, or ``None``. Returns ------- bool ``True`` for atomic species, ``False`` otherwise. """ if iso_name is None: return False if "Fe" in str(iso_name): return True atoms = _parse_isotopologue_atoms(iso_name) return len(atoms) == 1
[docs] def diatomic_symmetry_class(iso_name: str | None) -> str: """Classify a diatomic isotopologue label for collision selection rules. Parameters ---------- iso_name : str, optional Isotopologue label such as ``"12C2"``, ``"12C13C"``, ``"12C14N"``, or ``None``. Returns ------- str One of: * ``"homonuclear"`` -- two identical ``(mass, element)`` atoms (e.g., ``"12C2"``, ``"13C2"``). Only :math:`|\Delta J| = 2` collisions are physical (nuclear-spin manifold preserved). * ``"same_element_heteronuclear"`` -- same element, different isotopes (e.g., ``"12C13C"``). Symmetry is broken so all J exist, but the underlying near-symmetric structure makes both :math:`|\Delta J| = 1` and :math:`|\Delta J| = 2` channels relevant. * ``"heteronuclear"`` -- different elements (e.g., ``"12C14N"``). Treat as a generic heteronuclear with the caller-provided ``dJ_allowed`` (defaults to :math:`|\Delta J| = 1` only). * ``"unknown"`` -- label could not be parsed as a diatomic. """ atoms = _parse_isotopologue_atoms(iso_name) if iso_name else [] if len(atoms) != 2: return "unknown" (m1, e1), (m2, e2) = atoms if (m1, e1) == (m2, e2): return "homonuclear" if e1 == e2: return "same_element_heteronuclear" return "heteronuclear"
[docs] def precompute_collision_scaffold( lines_out: Any, idx_to_level: dict, *, upper_id_col: str = "upper_id", lower_id_col: str = "lower_id", lower_es_col: str = "lower_es", lower_v_col: str = "lower_v", lower_J_col: str = "lower_J", lower_sym_col: str = "lower_sym", E_lower_cm1_col: str = "E_lower_cm1", include_deltaJ0_parity_mix: bool = True, require_X_only: bool = True, iso_name: str | None = None, homonuclear: bool | None = None, dJ_allowed: Sequence[int] = (1,), ) -> dict: """Build a rotational-collision scaffold from explicit lower-state columns. Parameters ---------- lines_out : Any Annotated transition table. idx_to_level : dict Mapping from matrix index to level identifier. upper_id_col : str, optional, default "upper_id" Upper-level ID column name. lower_id_col : str, optional, default "lower_id" Lower-level ID column name. lower_es_col : str, optional, default "lower_es" Lower electronic-state column name. lower_v_col : str, optional, default "lower_v" Lower vibrational-state column name. lower_J_col : str, optional, default "lower_J" Lower rotational-state column name. lower_sym_col : str, optional, default "lower_sym" Lower-state symmetry/parity column name. E_lower_cm1_col : str, optional, default "E_lower_cm1" Lower-state energy column name in cm^-1. include_deltaJ0_parity_mix : bool, optional, default True Allow ``Delta J = 0`` collisions between sublevels with different ``sym`` label at the same J. Fires when either the molecule is truly heteronuclear (different elements, e.g. CN, OH) OR the isotopologue has at least one nucleus with nonzero spin (hyperfine structure, e.g. 13C2, 12C13C). Ignored only for strictly zero-hyperfine homonuclear species like 12C2. require_X_only : bool, optional, default True Restrict collisions to the lower electronic state. The lower state is auto-detected as the ``lower_es`` label whose minimum ``E_lower_cm1`` is smallest, so any spectroscopic notation works (``"X"``, ``"X1Sigmag+"``, etc.). iso_name : str, optional, default None Optional isotopologue label (e.g., ``"12C2"``, ``"13C2"``, ``"12C13C"``, ``"12C14N"``) used to auto-classify the diatomic via :func:`diatomic_symmetry_class`. Drives the ``|Delta J|`` rule: * homonuclear -> :math:`|\Delta J|=2` only * same-element heteronuclear (e.g., ``"12C13C"``) -> :math:`|\Delta J|=1` and :math:`|\Delta J|=2` * heteronuclear different elements (e.g., ``"12C14N"``) -> uses ``dJ_allowed``, which defaults to :math:`|\Delta J|=1` only (Arbitrary choice). Ignored when ``homonuclear`` is given explicitly. homonuclear : bool, optional, default None Explicit override for nuclear symmetry. ``True`` forces ``|Delta J| = 2`` only and disables ``include_deltaJ0_parity_mix`` (preserves nuclear-spin manifold). ``False`` uses ``dJ_allowed``. ``None`` auto-detects from ``iso_name``. dJ_allowed : Sequence[int], optional, default (1,) Allowed ``|Delta J|`` values for the heteronuclear different-element case (e.g., CN). Defaults to ``(1,)`` to match Manfroid 2009 [1]_. Ignored for homonuclear (forced to ``(2,)``) and for same-element heteronuclear (forced to ``(1, 2)``). Returns ------- dict Scaffold dictionary mapping pre-computed collisional pair data to NumPy arrays. All arrays share the same length (one entry per allowed collisional pair) and are aligned by index. Keys: * ``iu`` (:class:`numpy.ndarray` of :class:`int`) -- Matrix index of the upper level of each collisional pair. * ``il`` (:class:`numpy.ndarray` of :class:`int`) -- Matrix index of the lower level of each collisional pair. * ``gu`` (:class:`numpy.ndarray` of :class:`float`) -- Degeneracy of the upper level. * ``gl`` (:class:`numpy.ndarray` of :class:`float`) -- Degeneracy of the lower level. * ``dE`` (:class:`numpy.ndarray` of :class:`float`) -- Energy gap :math:`E_u - E_l` between the two levels, in :math:`\mathrm{cm}^{-1}` (always non-negative by construction). Raises ------ ValueError If required columns are missing. References ---------- .. [1] Manfroid, J., Jehin, E., Hutsemékers, D., et al. 2009, A&A, 503, 613 (`link <https://ui.adsabs.harvard.edu/abs/2009A&A...503..613M>`_) """ if lines_out is None or len(lines_out) == 0: return _empty_scaffold() def has_col(obj, col: str) -> bool: if hasattr(obj, "columns"): return col in obj.columns if hasattr(obj, "colnames"): return col in obj.colnames try: obj[col] return True except (KeyError, IndexError, AttributeError, TypeError): return False def col_as_array(obj, col: str) -> np.ndarray: return np.asarray(obj[col]) needed = [upper_id_col, lower_id_col, lower_es_col, lower_v_col, lower_J_col, lower_sym_col, E_lower_cm1_col] missing = [c for c in needed if not has_col(lines_out, c)] if missing: raise ValueError( "include_rotations=True requires these columns in the linelist: " + ", ".join(missing) ) level_id_to_idx: dict[str, int] = {} for i, v in idx_to_level.items(): level_id_to_idx[str(v)] = int(i) lower_ids = col_as_array(lines_out, lower_id_col).astype(str) les = col_as_array(lines_out, lower_es_col).astype(str) lv = col_as_array(lines_out, lower_v_col).astype(float) lJ = col_as_array(lines_out, lower_J_col).astype(float) lsym = col_as_array(lines_out, lower_sym_col).astype(str) Elow = col_as_array(lines_out, E_lower_cm1_col).astype(float) good = np.isfinite(lv) & np.isfinite(lJ) & np.isfinite(Elow) lower_ids, les, lv, lJ, lsym, Elow = lower_ids[good], les[good], lv[good], lJ[good], lsym[good], Elow[good] if require_X_only and lower_ids.size > 0: es_strs = np.array([str(es).strip() for es in les]) unique_es = np.unique(es_strs) es_min_E = {es: float(np.min(Elow[es_strs == es])) for es in unique_es} ground_es = min(es_min_E, key=es_min_E.get) m_ground = es_strs == ground_es lower_ids, les, lv, lJ, lsym, Elow = ( lower_ids[m_ground], les[m_ground], lv[m_ground], lJ[m_ground], lsym[m_ground], Elow[m_ground] ) if lower_ids.size == 0: return _empty_scaffold() if homonuclear is True: sym_class = "homonuclear" elif homonuclear is False: sym_class = "heteronuclear" else: sym_class = diatomic_symmetry_class(iso_name) if sym_class == "homonuclear": dJ_set = {2} elif sym_class == "same_element_heteronuclear": dJ_set = {1, 2} else: dJ_set = {int(d) for d in dJ_allowed} has_hyperfine = _has_nonzero_nuclear_spin(iso_name) keys = list(zip(les.astype(str), lv.astype(float), lJ.astype(float), lsym.astype(str))) from collections import defaultdict E_by_key = defaultdict(list) id_by_key = {} for k, e, lid in zip(keys, Elow, lower_ids): if np.isfinite(e): E_by_key[k].append(float(e)) if k not in id_by_key: id_by_key[k] = str(lid) unique_keys = [k for k in E_by_key.keys() if len(E_by_key[k]) > 0 and k in id_by_key] if len(unique_keys) < 2: return _empty_scaffold() E_cm1_level = {k: float(np.median(E_by_key[k])) for k in unique_keys} key_to_idx = {} for k in unique_keys: lid = id_by_key[k] if lid in level_id_to_idx: key_to_idx[k] = level_id_to_idx[lid] ground = [k for k in unique_keys if k in key_to_idx] if len(ground) < 2: return _empty_scaffold() ground = sorted(ground, key=lambda k: (float(k[2]), str(k[3]))) iu_list, il_list, gu_list, gl_list, dE_list = [], [], [], [], [] seen = set() for a in range(len(ground)): esa, va, Ja, sa = ground[a] for b in range(a + 1, len(ground)): esb, vb, Jb, sb = ground[b] dJ = abs(float(Ja) - float(Jb)) allow = (int(dJ) in dJ_set) or ( (sym_class == "heteronuclear" or has_hyperfine) and include_deltaJ0_parity_mix and dJ == 0 and str(sa) != str(sb) ) if not allow: continue Ea = E_cm1_level[ground[a]] Eb = E_cm1_level[ground[b]] if Eb > Ea: ku, kl = ground[b], ground[a] dE_cm1 = Eb - Ea else: ku, kl = ground[a], ground[b] dE_cm1 = Ea - Eb mu = int(key_to_idx[ku]) ml = int(key_to_idx[kl]) keypair = (min(mu, ml), max(mu, ml)) if keypair in seen: continue seen.add(keypair) gu = 2.0 * float(ku[2]) + 1.0 gl = 2.0 * float(kl[2]) + 1.0 iu_list.append(mu) il_list.append(ml) gu_list.append(gu) gl_list.append(gl) dE_list.append(dE_cm1) return dict( iu=np.asarray(iu_list, int), il=np.asarray(il_list, int), gu=np.asarray(gu_list, float), gl=np.asarray(gl_list, float), dE=np.asarray(dE_list, float), )
[docs] def precompute_collision_scaffold_fast(*args, **kwargs) -> dict: """Build collision scaffold and add cached arrays for fast updates. Parameters ---------- args : tuple Positional arguments forwarded to :func:`precompute_collision_scaffold`. kwargs : dict Keyword arguments forwarded to :func:`precompute_collision_scaffold`. Returns ------- dict Collision scaffold with ``dE_over_k_K`` and ``gu_over_gl`` caches. """ sc = precompute_collision_scaffold(*args, **kwargs) if sc.get("iu", np.array([])).size == 0: sc["dE_over_k_K"] = np.array([], float) sc["gu_over_gl"] = np.array([], float) return sc dE_cm1 = np.asarray(sc["dE"], float) gu = np.asarray(sc["gu"], float) gl = np.asarray(sc["gl"], float) sc["dE_over_k_K"] = HC_OVER_K_B_KCM * dE_cm1 sc["gu_over_gl"] = gu / gl return sc
[docs] def apply_collisions_inplace(M: np.ndarray, scaffold: Dict[str, np.ndarray], *, Q: float, T: float) -> np.ndarray: """Apply rotational-collision rates to a matrix in place. Parameters ---------- M : numpy.ndarray Rate matrix to modify. scaffold : dict[str, numpy.ndarray] Collision scaffold containing ``iu``, ``il``, ``gu``, ``gl``, and ``dE``. Q : float Downward collision rate scale. T : float Kinetic temperature in K. Returns ------- numpy.ndarray The modified matrix ``M``. """ if scaffold.get("iu", np.array([])).size == 0 or Q <= 0: return M iu = scaffold["iu"] il = scaffold["il"] gu = scaffold["gu"] gl = scaffold["gl"] dE_cm1 = scaffold["dE"] kT = (const.k_B * (T * u.K)).to(u.erg).value dE_erg = (const.h * const.c * (dE_cm1 / u.cm)).to(u.erg).value Cdown = float(Q) Cup = (gu / gl) * Cdown * np.exp(-dE_erg / kT) np.add.at(M, (iu, iu), -Cdown) np.add.at(M, (il, il), -Cup) np.add.at(M, (il, iu), Cdown) np.add.at(M, (iu, il), Cup) return M
[docs] def apply_collisions_inplace_fast( M: np.ndarray, scaffold: Dict[str, np.ndarray], *, Q: float, T: float, Cup_work: np.ndarray, ) -> np.ndarray: """Apply collisions in place using cached arrays and reusable buffers. Parameters ---------- M : numpy.ndarray Rate matrix to modify. scaffold : dict[str, numpy.ndarray] Collision scaffold with ``dE_over_k_K`` and ``gu_over_gl``. Q : float Downward collision rate scale. T : float Kinetic temperature in K. Cup_work : numpy.ndarray Reusable working array for upward rates. Returns ------- numpy.ndarray The modified matrix ``M``. """ iu = scaffold.get("iu", None) if iu is None or iu.size == 0 or Q <= 0.0 or not np.isfinite(T) or T <= 0.0: return M il = scaffold["il"] dE_over_k_K = scaffold["dE_over_k_K"] gu_over_gl = scaffold["gu_over_gl"] np.divide(dE_over_k_K, T, out=Cup_work) np.negative(Cup_work, out=Cup_work) np.exp(Cup_work, out=Cup_work) Cup_work *= gu_over_gl Cup_work *= Q Cdown = Q np.add.at(M, (iu, iu), -Cdown) np.add.at(M, (il, il), -Cup_work) np.add.at(M, (il, iu), Cdown) np.add.at(M, (iu, il), Cup_work) return M