"""Line-list parsing and normalization routines.
Routines
--------
- :func:`normalize_cn_systems_arg` -- Normalize user-friendly CN system selectors to canonical tokens.
- :func:`from_user_linelist` -- Convert a user line list into the normalized transition schema.
- :func:`make_sym` -- Build a symmetry label.
- :func:`from_cn_brooke` -- Convert a Brooke CN line list (e.g. from :func:`~cometspec.helper.load_cn_linelist`) to the normalized schema.
- :func:`filter_cn_systems` -- Filter a Brooke CN line list by system, wavelength, and A (Einstein coefficient) threshold.
- :func:`load_default_transitions` -- Load and normalize packaged transitions per isotopologue.
- :func:`resolve_linelists_with_defaults` -- Resolve user-supplied linelists, filling in defaults for any missing isotopologues.
- :func:`default_linelist_source` -- Return the file path that would be loaded for a given isotopologue from packaged defaults.
- :func:`linelist_origins` -- Return a mapping of isotopologue to source description for a set of linelists.
- :func:`attach_pumping_and_labels` -- Attach pumping information and human-friendly labels to a transition table. **This is important as it ensures the solar pumping information is correctly associated with each transition.**
"""
from __future__ import annotations
from pathlib import Path
from typing import Sequence, Optional, Callable, Any
import re
import warnings
import numpy as np
import pandas as pd
from astropy import constants as const
from astropy import units as u
from astropy.table import Table
from . import helper
from .collisions import canonical_diatomic_name
__all__ = [
"normalize_cn_systems_arg",
"from_user_linelist",
"make_sym",
"from_cn_brooke",
"filter_cn_systems",
"load_default_transitions",
"resolve_linelists_with_defaults",
"default_linelist_source",
"linelist_origins",
"attach_pumping_and_labels",
]
#: Package path
PACKAGE_DIR: Path = Path(__file__).resolve().parent
#: PACKAGE_DIR / "data"
DATA_DIR: Path = PACKAGE_DIR / "data"
def _as_list(x: str | Sequence[str] | None) -> list[str]:
'''Normalize a string or sequence of strings to a list of strings.
Parameters
----------
x : str or Sequence[str], optional, default None
Input string or sequence of strings.
Returns
-------
list[str]
Normalized list of strings.
'''
if x is None:
return []
if isinstance(x, str):
return [x]
return list(x)
[docs]
def normalize_cn_systems_arg(systems: str | Sequence[str] | None) -> list[str]:
r"""Translate user-friendly CN band-system selectors into canonical tokens.
This is the input-parser for any function that needs to know which CN
band system(s) to operate on. It accepts a variety of human-friendly
spellings (case-insensitive, with or without dashes/parentheses) and
maps each one to a fixed set of internal tokens used downstream. A
sequence of selectors is also accepted; results are flattened and
deduplicated while preserving order.
The canonical (output) tokens are:
* ``"BX00"`` -- :math:`B^{2}\Sigma^{+} \to X^{2}\Sigma^{+}` violet system,
:math:`(v', v'') = (0, 0)` band (~388 nm).
* ``"AX_dv0"`` -- :math:`A^{2}\Pi \to X^{2}\Sigma^{+}` red system,
:math:`\Delta v = |v' - v''| = 0` sequence.
* ``"AX_dv1"`` -- :math:`A^{2}\Pi \to X^{2}\Sigma^{+}` red system,
:math:`\Delta v = |v' - v''| = 1` sequence.
* ``"AX_dv2"`` -- A–X red system, :math:`\Delta v = 2` sequence.
* ``"AX_dv3"`` -- A–X red system, :math:`\Delta v = 3` sequence.
* ``"XX"`` -- All X–X transitions.
* ``"ALL"`` -- This token if used, it will include all transitions, resulting in extremely long computation times.
Recognized input forms (all matched case-insensitively after stripping):
* ``None`` -- default selection, returns ``["BX00", "AX_dv1"]``.
* ``"both"``, ``"bx+ax"``, ``"bxax"`` -- violet plus all three red sequences.
* ``"all"`` -- returns ``["ALL"]``.
* ``"bx"``, ``"b-x"``, ``"bx(0,0)"``, ``"bx00"``, ``"bx_00"``, ``"b_x_00"``
-- the violet :math:`(0,0)` band.
* ``"ax"``, ``"a-x"`` -- the :math:`\Delta v = 1` and :math:`\Delta v = 2`
red sequences.
* ``"ax(dv=0)"``, ``"ax_dv0"`` -- A–X :math:`\Delta v = 0` only.
* ``"ax(dv=1)"``, ``"ax_dv1"`` -- A–X :math:`\Delta v = 1` only.
* ``"ax(dv=2)"``, ``"ax_dv2"`` -- A–X :math:`\Delta v = 2` only.
* ``"ax(dv=3)"``, ``"ax_dv3"`` -- A–X :math:`\Delta v = 3` only.
* ``"xx"`` -- all X–X transitions.
* Any other string -- passed through unchanged as a single-element list,
letting the caller handle (or reject) unknown tokens.
* A sequence (list, tuple, ...) of any of the above -- each element is
normalized recursively, results are concatenated, and duplicates are
removed while preserving first-occurrence order.
Parameters
----------
systems : str or sequence of str, optional
Band-system selector(s). See the list of recognized forms above.
Returns
-------
list of str
Canonical token list. Order matches the order of the input. No results are duplicated.
Examples
--------
.. code-block:: python
normalize_cn_systems_arg(None)
['BX00', 'AX_dv1']
normalize_cn_systems_arg("both")
['BX00', 'AX_dv1', 'AX_dv2', 'AX_dv3']
normalize_cn_systems_arg("BX")
['BX00']
normalize_cn_systems_arg(["bx", "ax_dv1", "bx"]) # dedup, order preserved
['BX00', 'AX_dv1']
normalize_cn_systems_arg("unknown")
['unknown']
"""
if systems is None:
return ["BX00", "AX_dv1"]
if isinstance(systems, str):
s = systems.strip().lower()
if s in ("both", "bx+ax", "bxax"):
return ["BX00", "AX_dv1", 'AX_dv2', 'AX_dv3']
if s in ("all",):
return ["ALL"]
if s in ("bx", "b-x", "bx(0,0)", "bx00", "bx_00", "b_x_00"):
return ["BX00"]
if s in ("ax", "a-x"):
return ["AX_dv1", 'AX_dv2']
if s in ("ax(dv=0)", "ax_dv0"):
return ["AX_dv0"]
if s in ("ax(dv=1)", "ax_dv1"):
return ["AX_dv1"]
if s in ("ax(dv=2)", "ax_dv2"):
return ["AX_dv2"]
if s in ("ax(dv=3)", "ax_dv3"):
return ["AX_dv3"]
if s in ('xx',):
return ['XX']
else:
warnings.warn(f"normalize_cn_systems_arg: unrecognized system selector {systems!r}, this will be omitted.")
return [systems]
else:
out: list[str] = []
for item in systems:
out.extend(normalize_cn_systems_arg(item))
seen = set()
out2 = []
for t in out:
if t not in seen:
seen.add(t)
out2.append(t)
return out2
[docs]
def from_user_linelist(
df: pd.DataFrame,
*,
lam_col: str,
A_col: str,
upper_id_col: str,
lower_id_col: str,
g_upper_col: str,
g_lower_col: str,
lower_es_col: str | None = None,
lower_v_col: str | None = None,
lower_J_col: str | None = None,
lower_sym_col: str | None = None,
E_lower_cm1_col: str | None = None,
) -> pd.DataFrame:
r"""Convert a user line list into the normalized transition schema.
Parameters
----------
df : pandas.DataFrame
Input line list table.
lam_col : str
Wavelength column in vacuum :math:`\AA`.
A_col : str
Einstein :math:`A` coefficient column in :math:`\mathrm{s}^{-1}`.
upper_id_col : str
Upper-state identifier column.
lower_id_col : str
Lower-state identifier column.
g_upper_col : str
Upper-state degeneracy column.
g_lower_col : str
Lower-state degeneracy column.
lower_es_col : str, optional, default None
Optional lower electronic-state column.
lower_v_col : str, optional, default None
Optional lower vibrational-level column.
lower_J_col : str, optional, default None
Optional lower rotational-level column.
lower_sym_col : str, optional, default None
Name of an optional column holding a composite lower-state spin-orbit/parity label. For Brooke-style line lists this is typically the concatenation of the lower-state :math:`F''`, :math:`p''`, and :math:`eS''` columns, which together identify the fine-structure/parity sublevel within its electronic state.
E_lower_cm1_col : str, optional, default None
Optional lower-state energy column in :math:`\mathrm{cm}^{-1}`. A pair of levels will use these values to get the :math:`\Delta E` for the collisions.
Returns
-------
pandas.DataFrame
Normalized transition table. Note that the output has `E_cm1` and optionally `E_lower_cm1`, they are different, the first is the energy corresponding to the transition (energy from the line wavelength) and the second one the energy of a state with respect the ground state.
Raises
------
ValueError
If required columns are missing or values are invalid.
"""
required = [lam_col, A_col, upper_id_col, lower_id_col, g_upper_col, g_lower_col]
missing = [c for c in required if c not in df.columns]
if missing:
raise ValueError(f"Missing required columns: {missing}")
lam_s = pd.to_numeric(df[lam_col], errors="coerce")
A_s = pd.to_numeric(df[A_col], errors="coerce")
gu_s = pd.to_numeric(df[g_upper_col], errors="coerce")
gl_s = pd.to_numeric(df[g_lower_col], errors="coerce")
uid_s = df[upper_id_col]
lid_s = df[lower_id_col]
valid = (
np.isfinite(lam_s) & (lam_s > 0)
& np.isfinite(A_s) & (A_s >= 0)
& np.isfinite(gu_s) & (gu_s > 0)
& np.isfinite(gl_s) & (gl_s > 0)
& uid_s.notna() & lid_s.notna()
)
opt_specs: list[tuple[str, str, bool]] = [
(lower_es_col, "lower_es_col", False),
(lower_v_col, "lower_v_col", True),
(lower_J_col, "lower_J_col", True),
(lower_sym_col, "lower_sym_col", False),
(E_lower_cm1_col, "E_lower_cm1_col", True),
]
for col, label, numeric in opt_specs:
if col is None:
continue
if col not in df.columns:
raise ValueError(f"{label}={col!r} not found.")
if numeric:
valid = valid & np.isfinite(pd.to_numeric(df[col], errors="coerce"))
else:
valid = valid & df[col].notna()
valid = valid.to_numpy()
n_dropped = int((~valid).sum())
if n_dropped > 0:
warnings.warn(
f"from_user_linelist: dropping {n_dropped} row(s) with missing/invalid values."
)
df = df.iloc[valid].reset_index(drop=True)
out = pd.DataFrame(index=df.index)
out["lambda_vac_A"] = pd.to_numeric(df[lam_col], errors="coerce").astype(float)
out["A_ul"] = pd.to_numeric(df[A_col], errors="coerce").astype(float)
out["upper_id"] = df[upper_id_col].astype(str)
out["lower_id"] = df[lower_id_col].astype(str)
out["g_upper"] = pd.to_numeric(df[g_upper_col], errors="coerce").astype(float)
out["g_lower"] = pd.to_numeric(df[g_lower_col], errors="coerce").astype(float)
lam_cm = out["lambda_vac_A"].to_numpy() * 1e-8
out["E_cm1"] = 1.0 / lam_cm
if lower_es_col is not None:
out["lower_es"] = df[lower_es_col].astype(str).str.strip().str.upper()
if lower_v_col is not None:
out["lower_v"] = pd.to_numeric(df[lower_v_col], errors="coerce").astype(float)
if lower_J_col is not None:
out["lower_J"] = pd.to_numeric(df[lower_J_col], errors="coerce").astype(float)
if lower_sym_col is not None:
out["lower_sym"] = df[lower_sym_col].astype(str).str.strip()
if E_lower_cm1_col is not None:
out["E_lower_cm1"] = pd.to_numeric(df[E_lower_cm1_col], errors="coerce").astype(float)
return out
[docs]
def make_sym(F, p, use_omega: bool = False, es: Optional[str] = None) -> str:
"""Build a compact CN-style symmetry label.
Parameters
----------
F : Any
Spin component or branch label.
p : Any
Parity label.
use_omega : bool, optional, default False
Whether to emit Omega-style labels for A states.
es : str, optional, default None
Electronic-state label.
Returns
-------
str
Compact symmetry token.
"""
ptag = str(p).strip().lower()[:1] if p not in (None, "") else "?"
try:
Fint = int(F)
except (ValueError, TypeError):
Fint = F
if use_omega and str(es).strip().upper().startswith("A"):
comp = "Ω3/2" if Fint == 1 else "Ω1/2"
return f"{comp}_{ptag}"
return f"F{Fint}_{ptag}"
[docs]
def from_cn_brooke(
df: pd.DataFrame,
*,
lam_col: str = "lambda_vac_A_from_Cal",
A_col: str = "A",
use_omega_labels: bool = False,
E_lower_col: str = "E''",
) -> pd.DataFrame:
"""Convert a Brooke CN line list (e.g. the output :func:`~cometspec.helper.load_cn_linelist`) to the normalized schema.
Parameters
----------
df : pandas.DataFrame
Brooke-format CN line list.
lam_col : str, optional, default "lambda_vac_A_from_Cal"
Wavelength column in vacuum Angstrom.
A_col : str, optional, default "A"
Einstein A coefficient column.
use_omega_labels : bool, optional, default False
Use Omega labels for A-state symmetry tags.
E_lower_col : str, optional, default "E''"
Lower-state energy column in cm^-1.
Returns
-------
pandas.DataFrame
Normalized transition table. Each row is one rovibronic transition.
* ``lambda_vac_A`` (:class:`float`) -- Vacuum wavelength in Å.
* ``A_ul`` (:class:`float`) -- Einstein :math:`A` coefficient (spontaneous emission rate), in s\ :sup:`-1`.
* ``upper_id`` (:class:`str`) -- String key identifying the upper level, formatted as ``ES|v=V|J=J|sym=S``.
* ``lower_id`` (:class:`str`) -- String key identifying the lower level, formatted as ``ES|v=V|J=J|sym=S``.
* ``g_upper`` (:class:`float`) -- Upper-level degeneracy.
* ``g_lower`` (:class:`float`) -- Lower-level degeneracy.
* ``E_cm1`` (:class:`float`) -- Transition energy in cm\ :sup:`-1`, computed as :math:`1/\lambda_{\mathrm{vac}}`.
* ``lower_es`` (:class:`str`) -- Lower electronic state label (e.g. ``X``).
* ``lower_v`` (:class:`float`) -- Lower vibrational quantum number :math:`v''`.
* ``lower_J`` (:class:`float`) -- Lower rotational quantum number :math:`J''`.
* ``lower_sym`` (:class:`str`) -- Lower-level symmetry tag (e-f parity and :math:`\Omega` component).
* ``E_lower_cm1`` (:class:`float`) -- Lower-state energy in cm\ :sup:`-1`, taken directly from ``E_lower_col``. A pair of levels will use these values to get the :math:`\Delta E` for the collisions.
Raises
------
ValueError
If required columns are missing or contain invalid values.
"""
src_cols = [
lam_col, A_col,
"F'", "p'", "eS'", "v'", "J'",
"F''", "p''", "eS''", "v''", "J''",
E_lower_col,
]
missing = [c for c in src_cols if c not in df.columns]
if missing:
raise ValueError(f"Brooke linelist missing required columns: {missing}")
lam_s = pd.to_numeric(df[lam_col], errors="coerce")
A_s = pd.to_numeric(df[A_col], errors="coerce")
Vu_s = pd.to_numeric(df["v'"], errors="coerce")
Vl_s = pd.to_numeric(df["v''"], errors="coerce")
Ju_s = pd.to_numeric(df["J'"], errors="coerce")
Jl_s = pd.to_numeric(df["J''"], errors="coerce")
El_s = pd.to_numeric(df[E_lower_col], errors="coerce")
valid = (
np.isfinite(lam_s) & (lam_s > 0)
& np.isfinite(A_s) & (A_s >= 0)
& np.isfinite(Vu_s) & np.isfinite(Vl_s)
& np.isfinite(Ju_s) & np.isfinite(Jl_s)
& np.isfinite(El_s)
& df["F'"].notna() & df["p'"].notna() & df["eS'"].notna()
& df["F''"].notna() & df["p''"].notna() & df["eS''"].notna()
).to_numpy()
n_dropped = int((~valid).sum())
if n_dropped > 0:
warnings.warn(
f"from_cn_brooke: dropping {n_dropped} row(s) with missing/invalid values."
)
if not valid.any():
return pd.DataFrame(columns=[
"lambda_vac_A", "A_ul", "upper_id", "lower_id",
"g_upper", "g_lower", "E_cm1",
"lower_es", "lower_v", "lower_J", "lower_sym", "E_lower_cm1",
])
df = df.iloc[valid].reset_index(drop=True)
out = pd.DataFrame(index=df.index)
out["lambda_vac_A"] = pd.to_numeric(df[lam_col], errors="coerce").astype(float)
out["A_ul"] = pd.to_numeric(df[A_col], errors="coerce").astype(float)
sym_u = [
make_sym(F, p, use_omega_labels, es)
for F, p, es in zip(df["F'"], df["p'"], df["eS'"])
]
sym_l = [
make_sym(F, p, use_omega_labels, es)
for F, p, es in zip(df["F''"], df["p''"], df["eS''"])
]
J_u = pd.to_numeric(df["J'"], errors="coerce").astype(float)
J_l = pd.to_numeric(df["J''"], errors="coerce").astype(float)
V_u = pd.to_numeric(df["v'"], errors="coerce").astype(float)
V_l = pd.to_numeric(df["v''"], errors="coerce").astype(float)
out["upper_id"] = [
f"{str(es).strip().upper()}|v={int(round(v))}|J={J:.6g}|sym={s}"
for es, v, J, s in zip(df["eS'"], V_u, J_u, sym_u)
]
out["lower_id"] = [
f"{'X' if str(es).strip().upper().startswith('X') else str(es).strip().upper()}|"
f"v={int(round(v))}|J={J:.6g}|sym={s}"
for es, v, J, s in zip(df["eS''"], V_l, J_l, sym_l)
]
out["g_upper"] = 2.0 * J_u + 1.0
out["g_lower"] = 2.0 * J_l + 1.0
lam_cm = out["lambda_vac_A"].to_numpy() * 1e-8
out["E_cm1"] = 1.0 / lam_cm
out["lower_es"] = df["eS''"].astype(str).str.strip().str.upper()
out["lower_v"] = V_l
out["lower_J"] = J_l
out["lower_sym"] = np.asarray(sym_l, dtype=str)
out["E_lower_cm1"] = pd.to_numeric(df[E_lower_col], errors="coerce").astype(float)
return out
[docs]
def filter_cn_systems(
df_all: pd.DataFrame,
*,
systems: str | Sequence[str] | None = None,
lambda_min_A: float = 2990.001,
lambda_max_A: float = 10009.998,
A_min: float | None = 1e4,
lam_col: str = "lambda_vac_A_from_Cal",
) -> pd.DataFrame:
"""Filter a Brooke CN line list by system, wavelength, and A (Einstein coefficient) threshold.
Parameters
----------
df_all : pandas.DataFrame
Full Brooke/Sneden CN table.
systems : str or Sequence[str], optional, default None
System selector(s) accepted by :func:`normalize_cn_systems_arg`.
lambda_min_A : float, optional, default 2990.001
Minimum wavelength in Angstrom.
lambda_max_A : float, optional, default 10009.998
Maximum wavelength in Angstrom.
A_min : float, optional, default 1e4
Minimum Einstein A threshold, or ``None`` to disable.
lam_col : str, optional, default "lambda_vac_A_from_Cal"
Wavelength column name.
Returns
-------
pandas.DataFrame
Filtered CN line list.
"""
df = df_all.copy()
tokens = normalize_cn_systems_arg(systems)
if "ALL" not in tokens:
df = df[df["eS''"].astype(str).str.upper().str.startswith("X")]
masks = []
if "BX00" in tokens:
masks.append((df["eS'"] == "B") & (df["v'"] == 0) & (df["v''"] == 0))
if "AX_dv0" in tokens:
masks.append((df["eS'"] == "A") & (np.abs(df["v'"] - df["v''"]) == 0))
if "AX_dv1" in tokens:
masks.append((df["eS'"] == "A") & (np.abs(df["v'"] - df["v''"]) == 1))
if "AX_dv2" in tokens:
masks.append((df["eS'"] == "A") & (np.abs(df["v'"] - df["v''"]) == 2))
if "AX_dv3" in tokens:
masks.append((df["eS'"] == "A") & (np.abs(df["v'"] - df["v''"]) == 3))
if 'XX' in tokens:
masks.append((df["eS'"] == "X"))
if not masks:
return df.iloc[0:0].reset_index(drop=True)
m = masks[0]
for mm in masks[1:]:
m = m | mm
df = df[m]
df = df[(df[lam_col] >= lambda_min_A) & (df[lam_col] <= lambda_max_A)]
if A_min is not None:
df = df[df["A"] >= float(A_min)]
return df.reset_index(drop=True)
[docs]
def load_default_transitions(
*,
isotopologues: str | Sequence[str] = "12C14N",
systems: str | Sequence[str] | None = None,
A_min: float = 1e4,
lambda_min_A: float = 2990.001,
lambda_max_A: float = 10009.998,
use_omega_labels: bool = False,
line_paths: dict[str, str] | None = None,
) -> dict[str, pd.DataFrame]:
"""Load and normalize packaged default transitions per isotopologue. The options are **"12C2", "12C13C", "13C2", "12C14N", "13C14N", "12C15N", "Fe".**
For CN if the isotopologue is not found it will fall back to "12C14N". Any string with Fe on it will load the fe_normalized.csv file. For C2 if the isotopologue is not found it will fail.
If CN is choosen, the systems to include can be given as a parameter. Default system is BX(0,0) and AX(Δv=+1), but this can be changed with the ``systems`` argument.
The options for ``systems`` is list containing one or more of the following str:
- "both" or "bx+ax": BX(0,0), AX(Δv=±1), AX(Δv=±2) and AX(Δv=±3)
- "all": all systems in the Brooke linelist (including minor ones, this will lead to extremely high computation times)
- "bx", "b-x", "bx(0,0)", "bx00", "bx_00", "b_x_00" or "b-x": BX(0,0) only
- "ax" or "a-x": for "AX_dv1", 'AX_dv2'
- "ax(dv=0)", "ax_dv0": AX(Δv=0) only
- "ax(dv=1)", "ax_dv1": AX(Δv=±1) only
- "ax(dv=2)", "ax_dv2": AX(Δv=±2) only
- "ax(dv=3)", "ax_dv3": AX(Δv=±3) only
- 'xx': all X-X transitions
At the end the references of each line list can be found [1]_ [2]_ [3]_ [4]_ [5]_. Additional filers to the line list were applied and are explained in [6]_.
.. note::
Rows on the line lists with missing or invalid values in any of the necessary columns are dropped.
Parameters
----------
isotopologues : str or Sequence[str], optional, default "12C14N"
One or more isotopologue labels.
systems : str or Sequence[str], optional, default None
CN system selector(s).
A_min : float, optional, default 1e4
Minimum Einstein A threshold.
lambda_min_A : float, optional, default 2990.001
Minimum wavelength in Angstrom.
lambda_max_A : float, optional, default 10009.998
Maximum wavelength in Angstrom.
use_omega_labels : bool, optional, default False
Use Omega labels for A-state symmetry tags.
line_paths : dict[str, str], optional, default None
Optional mapping of isotopologue to explicit file path.
Returns
-------
dict[str, pandas.DataFrame]
Dictionary mapping isotopologue label to normalized transition table. The keys are exactly the entries in ``isotopologues``; the values are DataFrames with the same schema as described in :func:`from_cn_brooke` or :func:`from_user_linelist`.
References
----------
.. [1] Brooke, J. S. A., Ram, R. S., Western, C. M., et al. 2014, ApJS, 210, 23 (`link <https://doi.org/10.1088/0067-0049/210/2/23>`__).
.. [2] Sneden, C., Lucatello, S., Ram, R. S., Brooke, J. S. A., & Bernath, P. 2014, ApJS, 214, 26 (`link <https://doi.org/10.1088/0067-0049/214/2/26>`__).
.. [3] Yurchenko, S. N., Szabó, I., Pyatenko, E., & Tennyson, J. 2018, MNRAS, 480, 3397 (`link <https://doi.org/10.1093/mnras/sty2050>`__).
.. [4] McKemmish, L. K., Syme, A.-M., Borsovszky, J., et al. 2020, MNRAS, 497, 1081 (`link <https://doi.org/10.1093/mnras/staa1954>`__).
.. [5] van Hoof, P. A. M. 2018, Galaxies, 6, 63 (`link <https://doi.org/10.3390/galaxies6020063>`__).
.. [6] Our Publication (To fill uppon acceptance).
"""
iso_list = _as_list(isotopologues)
out: dict[str, pd.DataFrame] = {}
sys_tokens = normalize_cn_systems_arg(systems)
for iso in iso_list:
matched = False
if re.match(r"^\d+C\d+N$", iso):
matched = True
if line_paths is not None and iso in line_paths:
path = line_paths[iso]
else:
try:
path = str(DATA_DIR / "CN" / f"{iso}.txt")
except TypeError:
path = str(DATA_DIR / "CN" / f"12C14N.txt")
df_all = helper.load_cn_linelist(path)
df_filt = filter_cn_systems(
df_all,
systems=sys_tokens,
lambda_min_A=lambda_min_A,
lambda_max_A=lambda_max_A,
A_min=A_min,
lam_col="lambda_vac_A_from_Cal",
)
out[iso] = from_cn_brooke(
df_filt,
lam_col="lambda_vac_A_from_Cal",
A_col="A",
use_omega_labels=use_omega_labels,
)
if 'Fe' in iso:
matched = True
path = DATA_DIR / 'fe_normalized.csv'
tab = pd.read_csv(path)
tab = tab[tab['A_ul'] > A_min]
out[iso] = _drop_invalid_normalized_rows(tab, label=iso)
if re.match(r"^\d+C\d+C$", iso) or re.match(r"^\d+C\d+$", iso):
matched = True
KEY_LINES = "/lines"
canon = canonical_diatomic_name(iso) or iso
path = DATA_DIR / f'C2/{canon}.h5'
if not path.exists():
path = DATA_DIR / f'C2/{iso}.h5'
tab = pd.read_hdf(path, key=KEY_LINES)
tab = tab[tab['A_ul'] > A_min]
out[iso] = _drop_invalid_normalized_rows(tab, label=iso)
if not matched:
raise ValueError(
f"No default linelist available for isotopologue {iso!r}. "
"Supported defaults are CN-like labels (e.g. '12C14N'), "
"C2-like labels (e.g. '12C2', '12C13C'), or labels containing "
"'Fe'. Provide a custom linelist via the `linelists` argument."
)
return out
[docs]
def resolve_linelists_with_defaults(
linelists: pd.DataFrame | dict[str, pd.DataFrame] | Sequence[pd.DataFrame] | None,
iso_list: Sequence[str],
*,
systems: str | Sequence[str] | None = None,
A_min: float = 1e4,
lambda_min_A: float = 2990.001,
lambda_max_A: float = 10009.998,
use_omega_labels: bool = False,
line_paths: dict[str, str] | None = None,
) -> dict[str, pd.DataFrame]:
""" Function to take a list of linelists and a list of isotopologues. It is going to match all the line lists with their isotopologues, if the len linelists is less than the len of isotopologues, the remaining isotopologues will be loaded with the default linelists.
Thus if the user wants to mix the default linelists and custome ones the isotopologues should be ordered by first the ones with provided line lists and then the ones without provided line lists, so the function can match them correctly.
Resolution rules:
- ``linelists is None`` -> every iso loaded from packaged defaults via
:func:`load_default_transitions`.
- Single :class:`pandas.DataFrame` -> assigned to ``iso_list[0]``; the
remaining isotopologues fall back to defaults.
- :class:`dict` mapping iso label to DataFrame -> entries used for matching
labels in ``iso_list``; any iso label not present in the dict falls back
to defaults. Keys not in ``iso_list`` are ignored.
- Sequence (``list``/``tuple``) of DataFrames -> positional pairing with the
first ``len(linelists)`` entries of ``iso_list``; the remainder fall back
to defaults.
Loading a default for an isotopologue without a packaged file (e.g. ``"COH"``)
raises :class:`ValueError` from :func:`load_default_transitions`.
Parameters
----------
linelists : pandas.DataFrame or dict[str, pandas.DataFrame] or Sequence[pandas.DataFrame] or None
User-supplied line list(s). See resolution rules above
iso_list : Sequence[str]
Isotopologue labels, in the order they should be returned. Each label is matched against the user-supplied line lists (if any) according to the resolution rules above, and any isotopologue without a user-supplied line list is loaded from the packaged defaults.
systems : str or Sequence[str], optional, default None
CN system selector(s) for default CN line lists. See :func:`normalize_cn_systems_arg` for accepted forms.
A_min : float, optional, default 1e4
Minimum Einstein A threshold for default line lists, or ``None`` to disable.
lambda_min_A : float, optional, default 2990.001
Minimum wavelength in Angstrom for default line lists.
lambda_max_A : float, optional, default 10009.998
Maximum wavelength in Angstrom for default line lists.
use_omega_labels : bool, optional, default False
Use Omega labels for A-state symmetry tags in default CN line lists.
line_paths : dict[str, str], optional, default None
Returns
-------
dict[str, pandas.DataFrame]
``{iso: DataFrame}`` ordered exactly as ``iso_list``.
"""
iso_list = list(iso_list)
if not iso_list:
raise ValueError("isotopologues is empty.")
user_by_iso: dict[str, pd.DataFrame] = {}
if linelists is None:
pass
elif isinstance(linelists, pd.DataFrame):
user_by_iso[iso_list[0]] = linelists
elif isinstance(linelists, dict):
for iso in iso_list:
if iso in linelists:
user_by_iso[iso] = linelists[iso]
elif isinstance(linelists, (list, tuple)):
if len(linelists) > len(iso_list):
raise ValueError(
f"Got {len(linelists)} linelists for {len(iso_list)} isotopologues; "
"too many."
)
for iso, df in zip(iso_list, linelists):
user_by_iso[iso] = df
else:
raise TypeError(
"linelists must be None, a DataFrame, a dict keyed by isotopologue, "
f"or a sequence of DataFrames; got {type(linelists).__name__}."
)
missing = [iso for iso in iso_list if iso not in user_by_iso]
if missing:
defaults = load_default_transitions(
isotopologues=missing,
systems=systems,
A_min=A_min,
lambda_min_A=lambda_min_A,
lambda_max_A=lambda_max_A,
use_omega_labels=use_omega_labels,
line_paths=line_paths,
)
for iso in missing:
user_by_iso[iso] = defaults[iso]
return {iso: user_by_iso[iso] for iso in iso_list}
[docs]
def default_linelist_source(iso: str) -> str:
"""Return the file path that would be loaded for ``iso`` from packaged defaults.
Parameters
----------
iso : str
Isotopologue label.
Returns
-------
str
File path that would be loaded for ``iso`` from packaged defaults.
Raises
------
ValueError
If ``iso`` does not match any supported default pattern for the packaged default line lists (12C14N, 13C14N, 12C15N, 12C2, 13C2, 12C13C, or any label containing "Fe").
(CN-like, C2-like, or containing ``"Fe"``).
"""
PACKAGE_DIR = Path(__file__).resolve().parent
DATA_DIR = PACKAGE_DIR / "data"
if re.match(r"^\d+C\d+N$", iso):
try:
path = str(DATA_DIR / "CN" / f"{iso}.txt")
except TypeError:
path = str(DATA_DIR / "CN" / f"12C14N.txt")
return path
if 'Fe' in iso:
return str(DATA_DIR / 'fe_normalized.csv')
if re.match(r"^\d+C\d+C$", iso) or re.match(r"^\d+C\d+$", iso):
canon = canonical_diatomic_name(iso) or iso
path = DATA_DIR / f'C2/{canon}.h5'
if not path.exists():
path = DATA_DIR / f'C2/{iso}.h5'
return str(path)
raise ValueError(
f"No default linelist available for isotopologue {iso!r}."
)
[docs]
def linelist_origins(
linelists: pd.DataFrame | dict[str, pd.DataFrame] | Sequence[pd.DataFrame] | None,
iso_list: Sequence[str],
*,
line_paths: dict[str, str] | None = None,
) -> dict[str, str]:
"""Return a per-isotopologue origin string (file) for the configured line lists.
Mirrors the resolution rules of :func:`resolve_linelists_with_defaults`:
- Entries supplied by the user (DataFrame, dict entry, or positional list
slot) are reported as ``"custom (user-provided)"``.
- Entries with an explicit override in ``line_paths`` are reported as that
path.
- Otherwise the path returned by :func:`default_linelist_source` is used.
Does not load any data just to determine the origin used.
Parameters
----------
linelists : pandas.DataFrame or dict[str, pandas.DataFrame] or Sequence[pd.DataFrame] or None
User-supplied line list(s). See resolution rules in :func:`resolve_linelists_with_defaults`.
iso_list : Sequence[str]
Isotopologue labels, in the order they should be returned. Each label is matched against the user-supplied line lists (if any) according to the resolution rules above, and any isotopologue without a user-supplied line list is assigned the origin of the packaged default
line_paths : dict[str, str], optional, default None
Optional mapping of isotopologue to explicit file path, used for reporting the origin of any isotopologue without a user-supplied line list. If an isotopologue is present in this dict, its origin is reported as the corresponding path instead of the default path returned by :func:`default_linelist_source`. This is intended to be used when the user has provided a custom path
Returns
-------
dict[str, str]
Mapping of isotopologue label to origin string (e.g. file path). The keys are exactly the entries in ``iso_list``; the values are determined according to the resolution rules above.
"""
iso_list = list(iso_list)
user_isos: set[str] = set()
if linelists is None:
pass
elif isinstance(linelists, pd.DataFrame):
if iso_list:
user_isos.add(iso_list[0])
elif isinstance(linelists, dict):
user_isos = {iso for iso in iso_list if iso in linelists}
elif isinstance(linelists, (list, tuple)):
user_isos = set(iso_list[: len(linelists)])
else:
raise TypeError(
"linelists must be None, a DataFrame, a dict keyed by isotopologue, "
f"or a sequence of DataFrames; got {type(linelists).__name__}."
)
out: dict[str, str] = {}
for iso in iso_list:
if iso in user_isos:
out[iso] = "custom (user-provided)"
elif line_paths is not None and iso in line_paths:
out[iso] = str(line_paths[iso])
else:
out[iso] = default_linelist_source(iso)
return out
def _drop_invalid_normalized_rows(tab: pd.DataFrame, *, label: str = "") -> pd.DataFrame:
"""Drop rows of an already-normalized linelist that have missing/invalid values.
Numeric columns must be finite; string/id columns must be non-null. Wavelength
and degeneracy columns must additionally be > 0; A_ul must be >= 0. Lines that
fail any check are dropped (with a warning) instead of silently propagating
NaN into the rate matrix or the collision scaffold.
Parameters
----------
tab : pandas.DataFrame
Normalized transition table to check.
label : str, optional, default ""
Optional label to include in the warning message for dropped rows.
Returns
-------
pandas.DataFrame
Cleaned table with invalid rows dropped and index reset. If the input table is empty or None, returns an empty table with the same columns and a reset index.
"""
if tab is None or len(tab) == 0:
return tab.reset_index(drop=True) if tab is not None else tab
numeric_required = {
"lambda_vac_A": ("positive", True),
"A_ul": ("nonneg", True),
"g_upper": ("positive", False),
"g_lower": ("positive", False),
"lower_v": ("finite", False),
"lower_J": ("finite", False),
"E_lower_cm1": ("finite", False),
}
string_required = ("upper_id", "lower_id", "lower_es", "lower_sym")
valid = pd.Series(True, index=tab.index)
for col, (kind, _) in numeric_required.items():
if col not in tab.columns:
continue
s = pd.to_numeric(tab[col], errors="coerce")
m = np.isfinite(s)
if kind == "positive":
m = m & (s > 0)
elif kind == "nonneg":
m = m & (s >= 0)
valid &= m
for col in string_required:
if col in tab.columns:
valid &= tab[col].notna()
valid_arr = valid.to_numpy()
n_dropped = int((~valid_arr).sum())
if n_dropped > 0:
warnings.warn(
f"load_default_transitions[{label}]: dropping {n_dropped} row(s) with missing/invalid values."
)
return tab.iloc[valid_arr].reset_index(drop=True)
[docs]
def attach_pumping_and_labels(
df: pd.DataFrame,
pumping: Any,
*,
line_v_kms: float = 0.0,
line_dlam_A: float = 0.0,
lsf_for_Jnu: Optional[Callable[[np.ndarray], np.ndarray]] = None,
lam_col: str = "lambda_vac_A",
) -> Table:
"""Attach the solar flux incident in the comet for a given wavelength to a transition table.
Parameters
----------
df : pandas.DataFrame
Normalized transition DataFrame.
pumping : Any
Pumping spectrum with ``WAVE`` and ``FLUX`` columns.
line_v_kms : float, optional, default 0.0
Doppler velocity shift applied to line wavelengths, in km/s.
line_dlam_A : float, optional, default 0.0
Additive wavelength shift in Angstrom.
lsf_for_Jnu : Callable[[numpy.ndarray], numpy.ndarray], optional, default None
Optional kernel used to average flux around each line.
lam_col : str, optional, default "lambda_vac_A"
Input wavelength column name in ``df``.
Returns
-------
astropy.table.Table
Astropy table with wavelength, frequency, flux-at-line, J_nu and original dataframe columns.
"""
from .rates import _as_array
lam_rest = np.asarray(df[lam_col], float)
lam = lam_rest.copy()
if line_v_kms != 0.0:
c_kms = const.c.to("km/s").value
lam *= (1.0 + line_v_kms / c_kms)
if line_dlam_A != 0.0:
lam += line_dlam_A
wave_AA = _as_array(pumping, "WAVE")
F_vals = _as_array(pumping, "FLUX")
F_lambda = F_vals * (u.erg / (u.s * u.cm**2 * u.AA))
in_range = (lam >= wave_AA.min()) & (lam <= wave_AA.max())
df = df.reset_index(drop=True)[in_range]
lam = lam[in_range]
lines = Table.from_pandas(df.copy())
lam_q = lam * u.AA
lines["Wave_vac_AA"] = lam
lines["Frequency_Hz"] = (const.c / lam_q).to(u.Hz)
if lsf_for_Jnu is None:
F_interp = np.interp(lam, wave_AA, F_lambda.value) * F_lambda.unit
else:
F_eff = []
for lam0 in lam:
dl = wave_AA - lam0
kern = np.asarray(lsf_for_Jnu(dl), float)
kern = np.where(np.isfinite(kern), kern, 0.0)
s = kern.sum()
if s <= 0.0:
f_val = np.interp(lam0, wave_AA, F_lambda.value)
else:
f_val = np.sum(F_lambda.value * kern) / s
F_eff.append(f_val)
F_interp = np.asarray(F_eff) * F_lambda.unit
lines["F_lambda_at_comet_erg_s_cm2_AA"] = F_interp
F_nu = F_interp.to(
u.erg / (u.s * u.cm**2 * u.Hz),
equivalencies=u.spectral_density(lam_q),
)
J_nu = (F_nu / (4.0 * np.pi)) * (1.0 / u.sr)
lines["J_nu_erg_cm2_s_Hz_sr"] = J_nu.to(u.erg / (u.cm**2 * u.s * u.Hz * u.sr))
return lines