Source code for euclid_dsps.filters

"""Filter loading helpers."""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Any

import numpy as np


[docs] @dataclass(frozen=True) class FilterCurve: name: str wave: np.ndarray transmission: np.ndarray source: str @property def effective_wavelength(self) -> float: """Transmission-weighted central wavelength in Angstrom.""" norm = np.trapezoid(self.transmission, self.wave) if not np.isfinite(norm) or norm <= 0: return float(np.nanmean(self.wave)) return float(np.trapezoid(self.wave * self.transmission, self.wave) / norm)
APPROX_FILTERS_ANGSTROM = { "euclid_vis": (5500.0, 9000.0), "euclid_y": (9200.0, 11400.0), "euclid_j": (11400.0, 13700.0), "euclid_h": (13700.0, 20000.0), "euclid_nisp_y": (9200.0, 11400.0), "euclid_nisp_j": (11400.0, 13700.0), "euclid_nisp_h": (13700.0, 20000.0), "lsst_u": (3200.0, 4000.0), "lsst_g": (4000.0, 5500.0), "lsst_r": (5500.0, 7000.0), "lsst_i": (6900.0, 8200.0), "lsst_z": (8200.0, 9300.0), "lsst_y": (9500.0, 10500.0), }
[docs] def load_filters(band_configs: list[dict[str, Any]]) -> dict[str, FilterCurve]: """Load all filters declared in the config.""" filters: dict[str, FilterCurve] = {} for band in band_configs: name = band["name"] filters[name] = load_filter(name, band.get("filter", {})) return filters
[docs] def load_filter(name: str, filter_config: dict[str, Any]) -> FilterCurve: """Load an exact HDF5/FITS/DAT curve or build an approximate top-hat.""" kind = filter_config.get("kind", "auto") if kind == "hdf5" or ( "path" in filter_config and str(filter_config["path"]).endswith((".h5", ".hdf5")) ): from dsps import load_transmission_curve path = Path(filter_config["path"]) if not path.exists(): raise FileNotFoundError(f"Filter file not found for {name}: {path}") curve = load_transmission_curve(fn=str(path)) return FilterCurve( name=name, wave=np.asarray(curve.wave, dtype=float), transmission=np.asarray(curve.transmission, dtype=float), source=str(path), ) if kind == "fits" or ( "path" in filter_config and str(filter_config["path"]).endswith((".fits", ".fit", ".fts")) ): path = Path(filter_config["path"]) if not path.exists(): raise FileNotFoundError(f"Filter file not found for {name}: {path}") return load_fits_filter(name, path, filter_config) if ( kind == "ascii" or kind == "dat" or ( "path" in filter_config and str(filter_config["path"]).endswith((".dat", ".txt", ".ascii", ".csv")) ) ): path = Path(filter_config["path"]) if not path.exists(): raise FileNotFoundError(f"Filter file not found for {name}: {path}") return load_ascii_filter(name, path, filter_config) if kind in {"auto", "tophat"}: wave_min = filter_config.get("wave_min") wave_max = filter_config.get("wave_max") if wave_min is None or wave_max is None: if name not in APPROX_FILTERS_ANGSTROM: raise ValueError(f"No approximate filter range known for {name}") wave_min, wave_max = APPROX_FILTERS_ANGSTROM[name] n_wave = int(filter_config.get("n_wave", 512)) wave = np.linspace(float(wave_min), float(wave_max), n_wave) transmission = np.ones_like(wave) source = f"approx_tophat_{float(wave_min):.0f}_{float(wave_max):.0f}A" return FilterCurve( name=name, wave=wave, transmission=transmission, source=source ) raise ValueError(f"Unsupported filter kind for {name}: {kind}")
[docs] def load_ascii_filter( name: str, path: Path, filter_config: dict[str, Any] ) -> FilterCurve: """Load two-column ASCII throughput files.""" data = _load_two_column_ascii(path) wave = np.asarray(data[:, 0], dtype=float) transmission = np.asarray(data[:, 1], dtype=float) wave = wave * _wave_unit_to_angstrom_factor( str(filter_config.get("wave_unit", "angstrom")) ) order = np.argsort(wave) wave = wave[order] transmission = np.clip(transmission[order], 0.0, 1.0) mask = np.isfinite(wave) & np.isfinite(transmission) return FilterCurve( name=name, wave=wave[mask], transmission=transmission[mask], source=str(path) )
def _load_two_column_ascii(path: Path) -> np.ndarray: """Read whitespace- or comma-separated two-column numeric tables.""" try: data = np.loadtxt(path) except ValueError: data = np.loadtxt(path, delimiter=",") if data.ndim != 2 or data.shape[1] < 2: raise ValueError(f"Filter file must contain at least two columns: {path}") return np.asarray(data[:, :2], dtype=float)
[docs] def load_fits_filter( name: str, path: Path, filter_config: dict[str, Any] ) -> FilterCurve: """Load Euclid throughput FITS files and convert wavelength to Angstrom.""" from astropy.io import fits data = fits.getdata(path, int(filter_config.get("hdu", 1))) wave_column = filter_config.get("wave_column", "WAVE") throughput_column = filter_config.get("throughput_column", "T_TOTAL") wave = np.asarray(data[wave_column], dtype=float) transmission = np.asarray(data[throughput_column], dtype=float) wave = wave * _wave_unit_to_angstrom_factor( str(filter_config.get("wave_unit", "nm")) ) order = np.argsort(wave) wave = wave[order] transmission = np.clip(transmission[order], 0.0, 1.0) mask = np.isfinite(wave) & np.isfinite(transmission) return FilterCurve( name=name, wave=wave[mask], transmission=transmission[mask], source=str(path) )
def _wave_unit_to_angstrom_factor(unit: str) -> float: normalized = unit.strip().lower() if normalized in {"angstrom", "ang", "aa", "a"}: return 1.0 if normalized in {"nm", "nanometer", "nanometers"}: return 10.0 if normalized in {"micron", "microns", "um"}: return 10_000.0 raise ValueError(f"Unsupported filter wavelength unit: {unit}")