Source code for stellarphot.io.aavso

"""Writer for the AAVSO Extended File Format used by WebObs.

Implements ensemble photometry submissions (CNAME=ENSEMBLE, CMAG=na) with one
target star and one check star. The data layout follows the spec mirrored in
``stellarphot/io/aavso_submission_schema.yml``.

v1 limitations:
- ``DATE=JD`` only. ``HJD`` and ``EXCEL`` are valid in the header model but
  raise ``NotImplementedError`` from the writer.
- ``MTYPE`` is hardcoded to ``STD`` (calibrated/standardized magnitudes), which
  is the correct value when CNAME=ENSEMBLE.
- ``OBSTYPE`` is hardcoded to ``CCD``.
"""

import io
from pathlib import Path

import numpy as np
from astropy.table import Column, QTable, Table, join
from astropy.time import Time

from stellarphot.settings.aavso_models import AAVSOFilters
from stellarphot.settings.aavso_submission import AAVSOSubmissionHeader

__all__ = ["write_aavso_extended"]


ALLOWED_EXTENSIONS = frozenset({".txt", ".csv", ".tsv"})

# AAVSO data columns in spec order with their max character counts. ``None``
# means the field has no length limit. The AAVSO sample files prepend a row
# of these names with "#" before the data. AIRMASS is special: the spec says
# it should be truncated rather than rejected; ``_enforce_limit`` handles
# that.
FIELD_LIMITS = {
    "STARID": 30,
    "DATE": 16,
    "MAGNITUDE": 8,
    "MAGERR": 6,
    "FILTER": None,
    "TRANS": None,
    "MTYPE": None,
    "CNAME": 20,
    "CMAG": 8,
    "KNAME": 20,
    "KMAG": 8,
    "AIRMASS": 7,
    "GROUP": 5,
    "CHART": 20,
    "NOTES": None,
}


def _is_valid_filter(value):
    try:
        AAVSOFilters(value)
    except ValueError:
        return False
    return True


def _enforce_limit(name, value):
    """Validate that the stringified field does not exceed its limit.

    AIRMASS truncates; every other limited field raises.
    """
    limit = FIELD_LIMITS.get(name)
    if limit is None or len(value) <= limit:
        return value
    if name == "AIRMASS":
        return value[:limit]
    raise ValueError(
        f"AAVSO field {name}={value!r} exceeds the {limit}-character limit "
        f"(got {len(value)} characters)."
    )


def _require_nonblank(name, value):
    """Strip ``value`` and reject empty/whitespace-only required identifiers."""
    if value is None:
        raise ValueError(f"AAVSO field {name} is required; got None.")
    stripped = str(value).strip()
    if not stripped:
        raise ValueError(
            f"AAVSO field {name} is required; got an empty/whitespace value."
        )
    return stripped


def _reject_delimiter_or_newline(name, value, delimiter):
    """Reject string fields that contain the configured delimiter or a newline."""
    if delimiter in value:
        raise ValueError(
            f"AAVSO field {name}={value!r} contains the configured delimiter "
            f"{delimiter!r}; choose a delimiter that does not appear in the data."
        )
    if "\n" in value or "\r" in value:
        raise ValueError(
            f"AAVSO field {name}={value!r} contains a newline; "
            "AAVSO rows must be a single line."
        )


def _to_float(value):
    """Coerce a value (possibly an astropy ``Quantity``) to a plain float."""
    return float(getattr(value, "value", value))


def _format_mag(value, field_name):
    """Format a required magnitude field. Non-finite values raise."""
    f = _to_float(value)
    if not np.isfinite(f):
        raise ValueError(
            f"AAVSO field {field_name} is required but the value is "
            f"non-finite ({value!r}). Drop these rows before exporting."
        )
    return f"{f:.4f}"


def _format_magerr(value):
    """Format magnitude error as a 3-decimal float; 'na' for non-finite values."""
    f = _to_float(value)
    if not np.isfinite(f):
        return "na"
    return f"{f:.3f}"


def _validate_trans(value):
    """``trans`` controls a required YES/NO field; truthiness would silently
    flip a caller's intent (e.g. the string ``"False"`` is truthy)."""
    if not isinstance(value, bool):
        raise TypeError(
            f"trans must be a bool (True or False); got "
            f"{type(value).__name__} ({value!r})."
        )


def _coerce_group(value):
    """Coerce ``value`` to a non-bool ``int`` or ``None`` for the GROUP field.

    Accepts Python ints, numpy integers, integer-valued floats (``5.0``) and
    numeric strings (``"5"``). Rejects ``bool``, non-integer floats, and
    anything that doesn't convert cleanly to a number.
    """
    if value is None:
        return None
    if isinstance(value, bool):
        raise TypeError(f"group must be an int or None; got bool ({value!r}).")
    try:
        as_float = float(value)
    except (TypeError, ValueError) as exc:
        raise TypeError(
            f"group must be an int or None; got {type(value).__name__} ({value!r})."
        ) from exc
    if not np.isfinite(as_float) or as_float != int(as_float):
        raise ValueError(f"group must be an integer value; got {value!r}.")
    return int(as_float)


def _format_airmass(value):
    """Format an airmass as a 4-decimal float; return 'na' for non-finite values."""
    f = _to_float(value)
    if not np.isfinite(f):
        return "na"
    return f"{f:.4f}"


[docs] def write_aavso_extended( phot_data, path, *, header, target_star_id, target_name, check_star_id, check_name, chart, mag_column, mag_error_column, trans=False, group=None, notes="na", drop_missing_check=True, ): """Write an AAVSO Extended File Format submission for ensemble photometry. Parameters ---------- phot_data : `stellarphot.PhotometryData` Table of photometry results. Must contain at least the target star and the check star, paired by ``(date-obs, passband)``. path : str or `pathlib.Path` Destination file. Must have a ``.txt``, ``.csv`` or ``.tsv`` suffix. header : `stellarphot.settings.AAVSOSubmissionHeader` Header parameters. Only ``date_format="JD"`` is supported in v1. target_star_id : str or int The ``star_id`` value identifying the target rows in ``phot_data``. target_name : str The string written into the ``STARID`` column for every target row. check_star_id : str or int The ``star_id`` value identifying the check-star rows. check_name : str The string written into the ``KNAME`` column. chart : str The AAVSO chart sequence ID written into the ``CHART`` column. mag_column : str Name of the column in ``phot_data`` containing the calibrated magnitude for the target. The same column is read for the check-star rows. mag_error_column : str Name of the column in ``phot_data`` containing the magnitude error. trans : bool, optional ``True`` to emit ``TRANS=YES``, ``False`` (default) for ``TRANS=NO``. group : int or None, optional Optional grouping identifier. ``None`` (default) emits ``GROUP=na``. notes : str, optional Text written into the ``NOTES`` column. Defaults to ``"na"``. drop_missing_check : bool, optional How to handle target rows that have no check-star observation at the same ``(date-obs, passband)``. ``True`` (default) silently drops those target rows; ``False`` raises ``ValueError``. If dropping leaves no rows to write, ``ValueError`` is raised regardless. """ if not isinstance(header, AAVSOSubmissionHeader): raise TypeError( "header must be an AAVSOSubmissionHeader instance; " f"got {type(header).__name__}." ) if header.date_format != "JD": raise NotImplementedError( f"AAVSO writer only supports DATE=JD in this release; " f"got date_format={header.date_format!r}." ) if target_star_id == check_star_id: raise ValueError( "target_star_id and check_star_id must be different; " f"got {target_star_id!r} for both." ) for col in (mag_column, mag_error_column): if col not in phot_data.colnames: raise ValueError( f"Column {col!r} is not in phot_data; " f"available columns: {phot_data.colnames}" ) _validate_trans(trans) group = _coerce_group(group) path = Path(path) if path.suffix.lower() not in ALLOWED_EXTENSIONS: raise ValueError( f"AAVSO submission file must have one of {sorted(ALLOWED_EXTENSIONS)} " f"extensions; got {path.suffix!r}." ) delimiter = header.data_delimiter # Required identifier fields supplied by the caller. The AAVSO spec # forbids leading/trailing whitespace and empty values; we strip and # then refuse to write a row with a blank required field. target_name = _require_nonblank("target_name", target_name) check_name = _require_nonblank("check_name", check_name) chart = _require_nonblank("chart", chart) # NOTES is optional; "na" is the spec's missing value. Strip then fall # back to "na" so users can pass " " without producing a blank field. notes = str(notes).strip() if notes is not None else "" if not notes: notes = "na" # Reject values that would collide with the delimiter or break the row # structure. Applies to every user-controlled string field. for field_name, field_value in ( ("target_name", target_name), ("check_name", check_name), ("chart", chart), ("notes", notes), ): _reject_delimiter_or_newline(field_name, field_value, delimiter) target_mask = phot_data["star_id"] == target_star_id check_mask = phot_data["star_id"] == check_star_id if not target_mask.any(): raise ValueError(f"No rows in phot_data have star_id={target_star_id!r}.") if not check_mask.any(): raise ValueError(f"No rows in phot_data have star_id={check_star_id!r}.") # Reject invalid filters before doing any heavier work. for passband in set(phot_data["passband"][target_mask]): if not _is_valid_filter(passband): raise ValueError( f"Row passband {passband!r} is not a valid AAVSO filter. " "Apply a PassbandMap so the column uses AAVSO filter names " "before exporting." ) # Pull just the columns we need from each side so the join result is small # and the renamed columns are unambiguous. Pairing is on (date-obs, # passband) rather than (file, passband) so that submissions covering # multiple nights still pair correctly when filenames are reused across # nights. target_cols = [ "date-obs", "passband", "exposure", "airmass", mag_column, mag_error_column, ] check_cols = ["date-obs", "passband", mag_column] target_subset = QTable(phot_data[target_mask][target_cols], copy=True) check_subset = QTable(phot_data[check_mask][check_cols], copy=True) # Join on (date-obs, passband) — this replaces the manual lookup dictionary # and naturally drops target rows that have no matching check observation. paired = join( target_subset, check_subset, keys=["date-obs", "passband"], table_names=["target", "check"], join_type="left", ) # Detect target rows without a matching check observation. After a left # join those rows have the check magnitude masked. check_mag_col = f"{mag_column}_check" has_mask = hasattr(paired[check_mag_col], "mask") if has_mask and paired[check_mag_col].mask.any(): unmatched = paired[check_mag_col].mask if drop_missing_check: paired = paired[~unmatched] if len(paired) == 0: raise ValueError( f"drop_missing_check=True removed every target row; no " f"target observations have a matching check-star " f"observation for check_star_id={check_star_id!r}." ) else: missing = paired[unmatched][["date-obs", "passband"]] first = missing[0] raise ValueError( "No check-star row found for " f"(date-obs={first['date-obs']!r}, passband={first['passband']!r}); " f"check_star_id={check_star_id!r} must have a matching " "observation for every target observation. Pass " "drop_missing_check=True to drop unmatched target rows." ) # Preserve a stable, easy-to-compare row order. paired.sort(["date-obs", "passband"]) group_field = "na" if group is None else str(group) trans_field = "YES" if trans else "NO" notes_field = notes n = len(paired) target_mag_col = f"{mag_column}_target" # Build per-row string columns in AAVSO order. mid_jd = (Time(paired["date-obs"]) + paired["exposure"] / 2).jd date_values = [f"{jd:.5f}" for jd in mid_jd] mag_values = [_format_mag(v, "MAGNITUDE") for v in paired[target_mag_col]] err_values = [_format_magerr(v) for v in paired[mag_error_column]] kmag_values = [_format_mag(v, "KMAG") for v in paired[check_mag_col]] airmass_values = [_format_airmass(v) for v in paired["airmass"]] filter_values = [str(p) for p in paired["passband"]] columns = { "STARID": [str(target_name)] * n, "DATE": date_values, "MAGNITUDE": mag_values, "MAGERR": err_values, "FILTER": filter_values, "TRANS": [trans_field] * n, "MTYPE": ["STD"] * n, "CNAME": ["ENSEMBLE"] * n, "CMAG": ["na"] * n, "KNAME": [str(check_name)] * n, "KMAG": kmag_values, "AIRMASS": airmass_values, "GROUP": [group_field] * n, "CHART": [str(chart)] * n, "NOTES": [notes_field] * n, } # Enforce length limits on every column that has one. Validation fires before I/O. out_table = Table() for name, limit in FIELD_LIMITS.items(): values = columns[name] if limit is not None: values = [_enforce_limit(name, v) for v in values] out_table[name] = Column(values, dtype=str) # Final sweep: the configured delimiter must not appear anywhere in the # rendered data table or in the AAVSO column names. The header model # permits any printable ASCII except |/#/space, but values like "." # collide with every formatted numeric field and an uppercase letter # such as "A" appears in the AAVSO column names — both pass header # validation and the per-field user-input checks above (which only # cover string fields supplied by the caller) but would produce a # mis-parseable file. for col_name in FIELD_LIMITS: if delimiter in col_name: raise ValueError( f"AAVSO column name {col_name!r} contains the configured " f"delimiter {delimiter!r}; choose a different delimiter." ) for value in out_table[col_name]: if delimiter in value: raise ValueError( f"AAVSO field {col_name}={value!r} contains the configured " f"delimiter {delimiter!r}; choose a different delimiter." ) # Write the data rows to a string buffer via astropy's ascii writer, then # assemble the final file with the parameter header and the # column-name row prefixed with "#". buf = io.StringIO() out_table.write(buf, format="ascii.no_header", delimiter=delimiter) # astropy's ascii writer can emit os.linesep into the StringIO on # Windows, mixing with the LF terminators we use for the header lines. # Normalize to LF here so the open() below sees a uniform "\n" stream # and translates the whole file to the platform's native terminator. data_text = buf.getvalue().replace("\r\n", "\n").replace("\r", "\n") column_header = "#" + delimiter.join(FIELD_LIMITS) # utf-8 because user-supplied notes/software fields can contain # non-ASCII characters; default newline=None translates "\n" → os.linesep # so the file uses native line endings (LF on Unix, CRLF on Windows). with open(path, "w", encoding="utf-8") as f: for line in header.header_lines(): f.write(line + "\n") f.write(column_header + "\n") f.write(data_text) if not data_text.endswith("\n"): f.write("\n") return path