"""Writer for the AAVSO Extended File Format used by WebObs.
Implements ensemble photometry submissions (CNAME=ENSEMBLE, CMAG=na) with one
target star and one check star. The data layout follows the spec mirrored in
``stellarphot/io/aavso_submission_schema.yml``.
v1 limitations:
- ``DATE=JD`` only. ``HJD`` and ``EXCEL`` are valid in the header model but
raise ``NotImplementedError`` from the writer.
- ``MTYPE`` is hardcoded to ``STD`` (calibrated/standardized magnitudes), which
is the correct value when CNAME=ENSEMBLE.
- ``OBSTYPE`` is hardcoded to ``CCD``.
"""
import io
from pathlib import Path
import numpy as np
from astropy.table import Column, QTable, Table, join
from astropy.time import Time
from stellarphot.settings.aavso_models import AAVSOFilters
from stellarphot.settings.aavso_submission import AAVSOSubmissionHeader
__all__ = ["write_aavso_extended"]
ALLOWED_EXTENSIONS = frozenset({".txt", ".csv", ".tsv"})
# AAVSO data columns in spec order with their max character counts. ``None``
# means the field has no length limit. The AAVSO sample files prepend a row
# of these names with "#" before the data. AIRMASS is special: the spec says
# it should be truncated rather than rejected; ``_enforce_limit`` handles
# that.
FIELD_LIMITS = {
"STARID": 30,
"DATE": 16,
"MAGNITUDE": 8,
"MAGERR": 6,
"FILTER": None,
"TRANS": None,
"MTYPE": None,
"CNAME": 20,
"CMAG": 8,
"KNAME": 20,
"KMAG": 8,
"AIRMASS": 7,
"GROUP": 5,
"CHART": 20,
"NOTES": None,
}
def _is_valid_filter(value):
try:
AAVSOFilters(value)
except ValueError:
return False
return True
def _enforce_limit(name, value):
"""Validate that the stringified field does not exceed its limit.
AIRMASS truncates; every other limited field raises.
"""
limit = FIELD_LIMITS.get(name)
if limit is None or len(value) <= limit:
return value
if name == "AIRMASS":
return value[:limit]
raise ValueError(
f"AAVSO field {name}={value!r} exceeds the {limit}-character limit "
f"(got {len(value)} characters)."
)
def _require_nonblank(name, value):
"""Strip ``value`` and reject empty/whitespace-only required identifiers."""
if value is None:
raise ValueError(f"AAVSO field {name} is required; got None.")
stripped = str(value).strip()
if not stripped:
raise ValueError(
f"AAVSO field {name} is required; got an empty/whitespace value."
)
return stripped
def _reject_delimiter_or_newline(name, value, delimiter):
"""Reject string fields that contain the configured delimiter or a newline."""
if delimiter in value:
raise ValueError(
f"AAVSO field {name}={value!r} contains the configured delimiter "
f"{delimiter!r}; choose a delimiter that does not appear in the data."
)
if "\n" in value or "\r" in value:
raise ValueError(
f"AAVSO field {name}={value!r} contains a newline; "
"AAVSO rows must be a single line."
)
def _to_float(value):
"""Coerce a value (possibly an astropy ``Quantity``) to a plain float."""
return float(getattr(value, "value", value))
def _format_mag(value, field_name):
"""Format a required magnitude field. Non-finite values raise."""
f = _to_float(value)
if not np.isfinite(f):
raise ValueError(
f"AAVSO field {field_name} is required but the value is "
f"non-finite ({value!r}). Drop these rows before exporting."
)
return f"{f:.4f}"
def _format_magerr(value):
"""Format magnitude error as a 3-decimal float; 'na' for non-finite values."""
f = _to_float(value)
if not np.isfinite(f):
return "na"
return f"{f:.3f}"
def _validate_trans(value):
"""``trans`` controls a required YES/NO field; truthiness would silently
flip a caller's intent (e.g. the string ``"False"`` is truthy)."""
if not isinstance(value, bool):
raise TypeError(
f"trans must be a bool (True or False); got "
f"{type(value).__name__} ({value!r})."
)
def _coerce_group(value):
"""Coerce ``value`` to a non-bool ``int`` or ``None`` for the GROUP field.
Accepts Python ints, numpy integers, integer-valued floats (``5.0``) and
numeric strings (``"5"``). Rejects ``bool``, non-integer floats, and
anything that doesn't convert cleanly to a number.
"""
if value is None:
return None
if isinstance(value, bool):
raise TypeError(f"group must be an int or None; got bool ({value!r}).")
try:
as_float = float(value)
except (TypeError, ValueError) as exc:
raise TypeError(
f"group must be an int or None; got {type(value).__name__} ({value!r})."
) from exc
if not np.isfinite(as_float) or as_float != int(as_float):
raise ValueError(f"group must be an integer value; got {value!r}.")
return int(as_float)
def _format_airmass(value):
"""Format an airmass as a 4-decimal float; return 'na' for non-finite values."""
f = _to_float(value)
if not np.isfinite(f):
return "na"
return f"{f:.4f}"
[docs]
def write_aavso_extended(
phot_data,
path,
*,
header,
target_star_id,
target_name,
check_star_id,
check_name,
chart,
mag_column,
mag_error_column,
trans=False,
group=None,
notes="na",
drop_missing_check=True,
):
"""Write an AAVSO Extended File Format submission for ensemble photometry.
Parameters
----------
phot_data : `stellarphot.PhotometryData`
Table of photometry results. Must contain at least the target star
and the check star, paired by ``(date-obs, passband)``.
path : str or `pathlib.Path`
Destination file. Must have a ``.txt``, ``.csv`` or ``.tsv`` suffix.
header : `stellarphot.settings.AAVSOSubmissionHeader`
Header parameters. Only ``date_format="JD"`` is supported in v1.
target_star_id : str or int
The ``star_id`` value identifying the target rows in ``phot_data``.
target_name : str
The string written into the ``STARID`` column for every target row.
check_star_id : str or int
The ``star_id`` value identifying the check-star rows.
check_name : str
The string written into the ``KNAME`` column.
chart : str
The AAVSO chart sequence ID written into the ``CHART`` column.
mag_column : str
Name of the column in ``phot_data`` containing the calibrated magnitude
for the target. The same column is read for the check-star rows.
mag_error_column : str
Name of the column in ``phot_data`` containing the magnitude error.
trans : bool, optional
``True`` to emit ``TRANS=YES``, ``False`` (default) for ``TRANS=NO``.
group : int or None, optional
Optional grouping identifier. ``None`` (default) emits ``GROUP=na``.
notes : str, optional
Text written into the ``NOTES`` column. Defaults to ``"na"``.
drop_missing_check : bool, optional
How to handle target rows that have no check-star observation at the
same ``(date-obs, passband)``. ``True`` (default) silently drops
those target rows; ``False`` raises ``ValueError``. If dropping
leaves no rows to write, ``ValueError`` is raised regardless.
"""
if not isinstance(header, AAVSOSubmissionHeader):
raise TypeError(
"header must be an AAVSOSubmissionHeader instance; "
f"got {type(header).__name__}."
)
if header.date_format != "JD":
raise NotImplementedError(
f"AAVSO writer only supports DATE=JD in this release; "
f"got date_format={header.date_format!r}."
)
if target_star_id == check_star_id:
raise ValueError(
"target_star_id and check_star_id must be different; "
f"got {target_star_id!r} for both."
)
for col in (mag_column, mag_error_column):
if col not in phot_data.colnames:
raise ValueError(
f"Column {col!r} is not in phot_data; "
f"available columns: {phot_data.colnames}"
)
_validate_trans(trans)
group = _coerce_group(group)
path = Path(path)
if path.suffix.lower() not in ALLOWED_EXTENSIONS:
raise ValueError(
f"AAVSO submission file must have one of {sorted(ALLOWED_EXTENSIONS)} "
f"extensions; got {path.suffix!r}."
)
delimiter = header.data_delimiter
# Required identifier fields supplied by the caller. The AAVSO spec
# forbids leading/trailing whitespace and empty values; we strip and
# then refuse to write a row with a blank required field.
target_name = _require_nonblank("target_name", target_name)
check_name = _require_nonblank("check_name", check_name)
chart = _require_nonblank("chart", chart)
# NOTES is optional; "na" is the spec's missing value. Strip then fall
# back to "na" so users can pass " " without producing a blank field.
notes = str(notes).strip() if notes is not None else ""
if not notes:
notes = "na"
# Reject values that would collide with the delimiter or break the row
# structure. Applies to every user-controlled string field.
for field_name, field_value in (
("target_name", target_name),
("check_name", check_name),
("chart", chart),
("notes", notes),
):
_reject_delimiter_or_newline(field_name, field_value, delimiter)
target_mask = phot_data["star_id"] == target_star_id
check_mask = phot_data["star_id"] == check_star_id
if not target_mask.any():
raise ValueError(f"No rows in phot_data have star_id={target_star_id!r}.")
if not check_mask.any():
raise ValueError(f"No rows in phot_data have star_id={check_star_id!r}.")
# Reject invalid filters before doing any heavier work.
for passband in set(phot_data["passband"][target_mask]):
if not _is_valid_filter(passband):
raise ValueError(
f"Row passband {passband!r} is not a valid AAVSO filter. "
"Apply a PassbandMap so the column uses AAVSO filter names "
"before exporting."
)
# Pull just the columns we need from each side so the join result is small
# and the renamed columns are unambiguous. Pairing is on (date-obs,
# passband) rather than (file, passband) so that submissions covering
# multiple nights still pair correctly when filenames are reused across
# nights.
target_cols = [
"date-obs",
"passband",
"exposure",
"airmass",
mag_column,
mag_error_column,
]
check_cols = ["date-obs", "passband", mag_column]
target_subset = QTable(phot_data[target_mask][target_cols], copy=True)
check_subset = QTable(phot_data[check_mask][check_cols], copy=True)
# Join on (date-obs, passband) — this replaces the manual lookup dictionary
# and naturally drops target rows that have no matching check observation.
paired = join(
target_subset,
check_subset,
keys=["date-obs", "passband"],
table_names=["target", "check"],
join_type="left",
)
# Detect target rows without a matching check observation. After a left
# join those rows have the check magnitude masked.
check_mag_col = f"{mag_column}_check"
has_mask = hasattr(paired[check_mag_col], "mask")
if has_mask and paired[check_mag_col].mask.any():
unmatched = paired[check_mag_col].mask
if drop_missing_check:
paired = paired[~unmatched]
if len(paired) == 0:
raise ValueError(
f"drop_missing_check=True removed every target row; no "
f"target observations have a matching check-star "
f"observation for check_star_id={check_star_id!r}."
)
else:
missing = paired[unmatched][["date-obs", "passband"]]
first = missing[0]
raise ValueError(
"No check-star row found for "
f"(date-obs={first['date-obs']!r}, passband={first['passband']!r}); "
f"check_star_id={check_star_id!r} must have a matching "
"observation for every target observation. Pass "
"drop_missing_check=True to drop unmatched target rows."
)
# Preserve a stable, easy-to-compare row order.
paired.sort(["date-obs", "passband"])
group_field = "na" if group is None else str(group)
trans_field = "YES" if trans else "NO"
notes_field = notes
n = len(paired)
target_mag_col = f"{mag_column}_target"
# Build per-row string columns in AAVSO order.
mid_jd = (Time(paired["date-obs"]) + paired["exposure"] / 2).jd
date_values = [f"{jd:.5f}" for jd in mid_jd]
mag_values = [_format_mag(v, "MAGNITUDE") for v in paired[target_mag_col]]
err_values = [_format_magerr(v) for v in paired[mag_error_column]]
kmag_values = [_format_mag(v, "KMAG") for v in paired[check_mag_col]]
airmass_values = [_format_airmass(v) for v in paired["airmass"]]
filter_values = [str(p) for p in paired["passband"]]
columns = {
"STARID": [str(target_name)] * n,
"DATE": date_values,
"MAGNITUDE": mag_values,
"MAGERR": err_values,
"FILTER": filter_values,
"TRANS": [trans_field] * n,
"MTYPE": ["STD"] * n,
"CNAME": ["ENSEMBLE"] * n,
"CMAG": ["na"] * n,
"KNAME": [str(check_name)] * n,
"KMAG": kmag_values,
"AIRMASS": airmass_values,
"GROUP": [group_field] * n,
"CHART": [str(chart)] * n,
"NOTES": [notes_field] * n,
}
# Enforce length limits on every column that has one. Validation fires before I/O.
out_table = Table()
for name, limit in FIELD_LIMITS.items():
values = columns[name]
if limit is not None:
values = [_enforce_limit(name, v) for v in values]
out_table[name] = Column(values, dtype=str)
# Final sweep: the configured delimiter must not appear anywhere in the
# rendered data table or in the AAVSO column names. The header model
# permits any printable ASCII except |/#/space, but values like "."
# collide with every formatted numeric field and an uppercase letter
# such as "A" appears in the AAVSO column names — both pass header
# validation and the per-field user-input checks above (which only
# cover string fields supplied by the caller) but would produce a
# mis-parseable file.
for col_name in FIELD_LIMITS:
if delimiter in col_name:
raise ValueError(
f"AAVSO column name {col_name!r} contains the configured "
f"delimiter {delimiter!r}; choose a different delimiter."
)
for value in out_table[col_name]:
if delimiter in value:
raise ValueError(
f"AAVSO field {col_name}={value!r} contains the configured "
f"delimiter {delimiter!r}; choose a different delimiter."
)
# Write the data rows to a string buffer via astropy's ascii writer, then
# assemble the final file with the parameter header and the
# column-name row prefixed with "#".
buf = io.StringIO()
out_table.write(buf, format="ascii.no_header", delimiter=delimiter)
# astropy's ascii writer can emit os.linesep into the StringIO on
# Windows, mixing with the LF terminators we use for the header lines.
# Normalize to LF here so the open() below sees a uniform "\n" stream
# and translates the whole file to the platform's native terminator.
data_text = buf.getvalue().replace("\r\n", "\n").replace("\r", "\n")
column_header = "#" + delimiter.join(FIELD_LIMITS)
# utf-8 because user-supplied notes/software fields can contain
# non-ASCII characters; default newline=None translates "\n" → os.linesep
# so the file uses native line endings (LF on Unix, CRLF on Windows).
with open(path, "w", encoding="utf-8") as f:
for line in header.header_lines():
f.write(line + "\n")
f.write(column_header + "\n")
f.write(data_text)
if not data_text.endswith("\n"):
f.write("\n")
return path