import re
import numpy as np
import pandas as pd
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.table import Column, QTable, Table, TableAttribute
from astropy.time import Time
from astropy.wcs import WCS
from astroquery.vizier import Vizier
from .settings import Camera, Observatory, PassbandMap
__all__ = [
"BaseEnhancedTable",
"PhotometryData",
"CatalogData",
"apass_dr9",
"vsx_vizier",
"SourceListData",
]
[docs]
class BaseEnhancedTable(QTable):
"""
A class to validate an `astropy.table.QTable` table of astronomical data during
creation and store metadata as attributes.
This is based on the `astropy.timeseries.QTable` class. We extend this to
allow for checking of the units for the columns to match the table_description,
but what this returns is just an `astropy.table.QTable`.
Parameters
----------
input_data: `astropy.table.QTable` (Default: None)
A table containing astronomical data of interest. This table
will be checked to make sure all columns listed in table_description
exist and have the right units. Additional columns that
may be present in data but are not listed in table_description will
NOT be removed. This data is copied, so any changes made during
validation will not only affect the data attribute of the instance,
the original input data is left unchanged.
table_description: dict or dict-like (Default: None)
This is a dictionary where each key is a required table column name
and the value corresponding to each key is the required dtype
(can be None). This is used to check the format of the input data
table. Columns will be output in the order of the keys in the dictionary
with any additional columns tacked on the end.
colname_map: dict, optional (Default: None)
A dictionary containing old column names as keys and new column
names as values. This is used to automatically update the column
names to the desired names BEFORE the validation is performed.
Notes
-----
This class is based on the `astropy.timeseries.QTable` class. If no
table_description and no input_data is provided, then an empty QTable
is returned. If table_description and input_data are provided, then
validation of the inputs is performed and the resulting table is returned.
"""
def __init__(
self, *args, input_data=None, table_description=None, colname_map=None, **kwargs
):
if (table_description is None) and (input_data is None):
# Assume user is trying to create an empty table and let QTable
# handle it
super().__init__(*args, **kwargs)
else:
# Confirm a proper table description is passed (that is dict-like with keys
# and values)
try:
self._table_description = {k: v for k, v in table_description.items()}
except AttributeError as err:
raise TypeError(
"You must provide a dict as table_description (input "
f"table_description is type {type(table_description)})."
) from err
# Check data before copying to avoid recursive loop and non-QTable
# data input.
if not isinstance(input_data, Table) or isinstance(
input_data, BaseEnhancedTable
):
raise TypeError(
"You must provide an astropy Table and NOT a "
"BaseEnhancedTable as input_data (currently of "
f"type {type(input_data)})."
)
# Copy data before potential modification
data = input_data.copy()
# Rename columns before validation (if needed)
if colname_map is not None:
# Confirm a proper colname_map is passed
try:
self._colname_map = {k: v for k, v in colname_map.items()}
except AttributeError as err:
raise TypeError(
"You must provide a dict as table_description "
"(input table_description is type "
f"{type(self._table_description)})."
) from err
self._update_colnames(self._colname_map, data)
# Validate the columns
self._validate_columns(data)
# Revise column order to be in the order listed in table_description
# with unlisted columns tacked on the end
order_col_list = list(self._table_description.keys())
for col in data.colnames:
if col not in order_col_list:
order_col_list.append(col)
data = data[order_col_list]
# Call QTable initializer to finish up
super().__init__(data=data, **kwargs)
def _validate_columns(self, data):
# Check the format of the data table matches the table_description by
# checking each column listed in table_description exists and is the
# correct units.
# NOTE: This ignores any columns not in the table_description, it
# does not remove them.
for this_col, this_unit in self._table_description.items():
if this_unit is not None:
# Check type
try:
if data[this_col].unit != this_unit:
raise ValueError(
f"data['{this_col}'] is of wrong unit "
f"(should be {this_unit} but reported "
f"as {data[this_col].unit})."
)
except KeyError as err:
raise ValueError(
f"data['{this_col}'] is missing from input " "data."
) from err
else: # Check that columns with no units but are required exist!
try:
_ = data[this_col]
except KeyError as err:
raise ValueError(
f"data['{this_col}'] is missing from input " "data."
) from err
def _update_colnames(self, colname_map, data):
# Change column names as desired, done before validating the columns,
# which is why we work on _orig_data
for orig_name, new_name in colname_map.items():
try:
data.rename_column(orig_name, new_name)
except KeyError as err:
raise ValueError(
f"data['{orig_name}'] is missing from input "
"data but listed in colname_map!"
) from err
def _update_passbands(self):
# Converts filter names in filter column to AAVSO standard names
# Assumes _passband_map is in namespace.
# Create a list of new passband names instead of trying to change names in place
# in case any of the new names are longer than the longest of the old names.
# If that happens, astropy by default just truncates the names.
new_filter_name = [
(self._passband_map[orig_pb] if orig_pb in self._passband_map else orig_pb)
for orig_pb in self["passband"]
]
self["passband"] = new_filter_name
[docs]
def clean(self, remove_rows_with_mask=False, **other_restrictions):
"""
Return a catalog with only the rows that meet the criteria specified.
Parameters
----------
catalog : `astropy.table.Table`
Table of catalog information. There are no restrictions on the columns.
remove_rows_with_mask : bool, optional
If ``True``, remove rows in which one or more of the values is masked.
other_restrictions: dict, optional
Key/value pairs in which the key is the name of a column in the
catalog and the value is the criteria that values in that column
must satisfy to be kept in the cleaned catalog. The criteria must be
simple, beginning with a comparison operator and including a value.
See Examples below.
Returns
-------
same type as object whose method was called
Table with filtered data
Examples
--------
>>> from astropy.table import Table
>>> from stellarphot import BaseEnhancedTable # Any subclasses will work too
>>> t = Table([[1.0, 2.0, 3.0], [1.0, 2.0, 3.0]], names=('a', 'b'), masked=True)
>>> bet = BaseEnhancedTable(t)
>>> bet['a'].mask = [True, False, False]
>>> bet['b'].mask = [False, False, True]
>>> bet.clean(remove_rows_with_mask=True)
<BaseEnhancedTable length=1>
a b
float64 float64
------- -------
2.0 2.0
>>> bet.clean(a='>2')
<BaseEnhancedTable length=1>
a b
float64 float64
------- -------
3.0 --
"""
comparisons = {
"<": np.less,
"=": np.equal,
">": np.greater,
"<=": np.less_equal,
">=": np.greater_equal,
"!=": np.not_equal,
}
recognized_comparison_ops = "|".join(comparisons.keys())
criteria_re = re.compile(rf"({recognized_comparison_ops})([-+a-zA-Z0-9]+)")
keepers = np.ones([len(self)], dtype=bool)
if remove_rows_with_mask and self.has_masked_values:
for c in self.columns:
keepers &= ~self[c].mask
for column, restriction in other_restrictions.items():
results = criteria_re.match(restriction)
if not results:
raise ValueError(f"Criteria {column}{restriction} not " "understood.")
comparison_func = comparisons[results.group(1)]
comparison_value = results.group(2)
new_keepers = comparison_func(self[column], float(comparison_value))
keepers = keepers & new_keepers
return self[keepers]
[docs]
class PhotometryData(BaseEnhancedTable):
"""
A modified `astropy.table.QTable` to hold reduced photometry data that
provides the convenience of validating the data table is in the proper
format including units. It returns an `PhotometryData` which is
a `astropy.table.QTable` with additional attributes describing
the observatory and camera.
Parameters
----------
input_data: `astropy.table.QTable`, optional (Default: None)
A table containing all the instrumental aperture photometry results
to be validated. Note: It is allowed for the 'ra' and 'dec' columns
to have np.nan values, but if they do, the 'bjd' column will not be
computed and will also be left with 'np.nan'.
observatory: `stellarphot.settings.Observatory`, optional (Default: None)
Information about the observatory.
camera: `stellarphot.Camera`, optional (Default: None)
A description of the CCD used to perform the photometry.
colname_map: dict, optional (Default: None)
A dictionary containing old column names as keys and new column
names as values. This is used to automatically update the column
names to the desired names before the validation is performed.
passband_map: `stellarphot.settings.PassbandMap`, optional (Default: None)
An object containing a mapping from instrumental passband names to
AAVSO passband names. This is used to automatically
update the passband column to AAVSO standard names if desired. See
the documentation for `stellarphot.settings.PassbandMap` for more
information. The object behaves like a dictionary when accessing it.
retain_user_computed: bool, optional (Default: False)
If True, any computed columns (see USAGE NOTES below) that already
exist in `data` will be retained. If False, will throw an error
if any computed columns already exist in `data`.
Attributes
----------
camera: `stellarphot.Camera`
A description of the CCD used to perform the photometry.
observatory: `stellarphot.settings.Observatory`, optional (Default: None)
Information about the observatory.
Notes
-----
For validation of inputs, you must provide camera, observatory, AND input_data,
if you do not, an empty table will be returned.
To be accepted as valid, the `input_data` must MUST contain the following columns
with the following units. The data in those columns is NOT validated, the values in
those columns could be invalid. Furthermore, the 'consistent count units' below
simply means it can be any unit, but it must be the same for all the columns with
'consistent count units'.
================= =======
Column name Unit
----------------- -------
star_id None
RA u.deg
Dec u.deg
xcenter u.pix
ycenter u.pix
fwhm_x u.pix
fwhm_y u.pix
width u.pix
aperture u.pix
aperture_area u.pix
annulus_inner u.pix
annulus_outer u.pix
annulus_area u.pix
aperture_sum consistent count units
annulus_sum consistent count units
sky_per_pix_avg consistent count units (per pixel)
sky_per_pix_med consistent count units (per pixel)
sky_per_pix_std consistent count units (per pixel)
aperture_net_cnts consistent count units
noise_cnts consistent count units
noise_electrons u.electron
snr None
mag_inst None
mag_error None
exposure u.s
date-obs astropy.time.Time with scale='utc'
airmass None
passband None
file None
================= =======
In addition to these required columns, the following columns are created based
on the input data during creation.
bjd (only if ra and dec are all real numbers, otherwise set to np.nan)
night
If these computed columns already exist in `data` class the class
will throw an error a ValueError UNLESS`ignore_computed=True`
is passed to the initializer, in which case the columns will be
retained and not replaced with the computed values.
"""
# Define columns that must be in table and provide information about their type, and
# units.
phot_descript = {
"star_id": None,
"ra": u.deg,
"dec": u.deg,
"xcenter": u.pix,
"ycenter": u.pix,
"fwhm_x": u.pix,
"fwhm_y": u.pix,
"width": u.pix,
"aperture": u.pix,
"aperture_area": u.pix,
"annulus_inner": u.pix,
"annulus_outer": u.pix,
"annulus_area": u.pix,
"aperture_sum": None,
"annulus_sum": None,
"sky_per_pix_avg": None,
"sky_per_pix_med": None,
"sky_per_pix_std": None,
"aperture_net_cnts": None,
"noise_cnts": None,
"noise_electrons": u.electron,
"snr": None,
"mag_inst": None,
"mag_error": None,
"exposure": u.second,
"date-obs": None,
"airmass": None,
"passband": None,
"file": None,
}
observatory = TableAttribute(default=None)
camera = TableAttribute(default=None)
def __init__(
self,
*args,
input_data=None,
colname_map=None,
passband_map=None,
retain_user_computed=False,
**kwargs,
):
if (
(self.observatory is None)
and (self.camera is None)
and (input_data is None)
):
super().__init__(*args, **kwargs)
else:
# Check the time column is correct format and scale
try:
if input_data["date-obs"][0].scale != "utc":
raise ValueError(
"input_data['date-obs'] astropy.time.Time must "
"have scale='utc', "
f"not '{input_data['date-obs'][0].scale}'."
)
except AttributeError as err:
# Happens if first item doesn't have a "scale"
raise ValueError(
"input_data['date-obs'] isn't column of "
"astropy.time.Time entries."
) from err
# Convert input data to QTable (while also checking for required columns)
super().__init__(
input_data=input_data,
table_description=self.phot_descript,
colname_map=colname_map,
**kwargs,
)
# Perform input validation
if not isinstance(self.observatory, Observatory):
raise TypeError(
"observatory must be an "
"stellarphot.settings.Observatory object instead "
f"of type {type(self.observatory)}."
)
if not isinstance(self.camera, Camera):
raise TypeError(
"camera must be a stellarphot.Camera object instead "
f"of type {type(self.camera)}."
)
# Check for consistency of counts-related columns
counts_columns = [
"aperture_sum",
"annulus_sum",
"aperture_net_cnts",
"noise_cnts",
]
counts_per_pixel_columns = [
"sky_per_pix_avg",
"sky_per_pix_med",
"sky_per_pix_std",
]
cnts_unit = self[counts_columns[0]].unit
for this_col in counts_columns[1:]:
if input_data[this_col].unit != cnts_unit:
raise ValueError(
f"input_data['{this_col}'] has inconsistent units "
f"with input_data['{counts_columns[0]}'] (should "
f"be {cnts_unit} but it's "
f"{input_data[this_col].unit})."
)
for this_col in counts_per_pixel_columns:
if cnts_unit is None:
perpixel = u.pixel**-1
else:
perpixel = cnts_unit * u.pixel**-1
if input_data[this_col].unit != perpixel:
raise ValueError(
f"input_data['{this_col}'] has inconsistent units "
f"with input_data['{counts_columns[0]}'] (should "
f"be {perpixel} but it's "
f"{input_data[this_col].unit})."
)
# Compute additional columns (not done yet)
computed_columns = ["bjd", "night"]
# Check if columns exist already, if they do and retain_user_computed is
# False, throw an error.
for this_col in computed_columns:
if this_col in self.colnames:
if not retain_user_computed:
raise ValueError(
f"Computed column '{this_col}' already exist "
"in data. If you want to keep them, set "
"retain_user_computed=True."
)
else:
# Compute the columns that need to be computed (match requires
# python>=3.10)
match this_col:
case "bjd":
self["bjd"] = self.add_bjd_col(self.observatory)
case "night":
# Generate integer counter for nights. This should be
# approximately the MJD at noon local before the evening of
# the observation.
hr_offset = int(
self.observatory.earth_location.lon.value / 15
)
# Compute offset to 12pm Local Time before evening
LocalTime = Time(self["date-obs"]) + hr_offset * u.hr
hr = LocalTime.ymdhms.hour
# Compute number of hours to shift to arrive at 12 noon
# local time
shift_hr = hr.copy()
shift_hr[hr < 12] = shift_hr[hr < 12] + 12
shift_hr[hr >= 12] = shift_hr[hr >= 12] - 12
delta = (
-shift_hr * u.hr
- LocalTime.ymdhms.minute * u.min
- LocalTime.ymdhms.second * u.s
)
shift = Column(data=delta, name="shift")
# Compute MJD at local noon before the evening of this
# observation.
self["night"] = Column(
data=np.array(
(Time(self["date-obs"]) + shift).to_value("mjd"),
dtype=int,
),
name="night",
)
case _:
raise ValueError(
f"Trying to compute column ({this_col}). "
"This should never happen."
)
# Apply the filter/passband name update
if passband_map is not None:
self._passband_map = passband_map.model_copy()
self._update_passbands()
[docs]
def add_bjd_col(self, observatory):
"""
Returns a astropy column of barycentric Julian date times corresponding to
the input observations. It modifies that table in place.
"""
if np.isnan(self["ra"]).any() or np.isnan(self["dec"]).any():
print(
"WARNING: BJD could not be computed in output PhotometryData object "
"because some RA or Dec values are missing."
)
return np.full(len(self), np.nan)
else:
# Convert times at start of each observation to TDB (Barycentric Dynamical
# Time)
times = Time(self["date-obs"])
times_tdb = times.tdb
times_tdb.format = "jd" # Switch to JD format
# Compute light travel time corrections
ip_peg = SkyCoord(ra=self["ra"], dec=self["dec"], unit="degree")
ltt_bary = times.light_travel_time(
ip_peg, location=observatory.earth_location
)
time_barycenter = times_tdb + ltt_bary
# Return BJD at midpoint of exposure at each location
return Time(time_barycenter + self["exposure"] / 2, scale="tdb")
[docs]
class CatalogData(BaseEnhancedTable):
"""
A class to hold astronomical catalog data while performing validation
to confirm the minimum required columns ('id', 'ra', 'dec', 'mag', and
'passband') are present and have the correct units.
As a convenience function, when the user passes in an astropy table to validate,
the user can also pass in a col_rename dict which can be used to rename columns
in the data table BEFORE the check that the required columns are present.
Parameters
----------
input_data: `astropy.table.Table`, optional (Default: None)
A table containing all the astronomical catalog data to be validated.
This data is copied, so any changes made during validation will not
affect the input data, only the data in the class.
catalog_name: str, optional (Default: None)
User readable name for the catalog.
catalog_source: str, optional (Default: None)
User readable designation for the source of the catalog (could be a
URL or a journal reference).
colname_map: dict, optional (Default: None)
A dictionary containing old column names as keys and new column
names as values. This is used to automatically update the column
names to the desired names BEFORE the validation is performed.
passband_map: `stellarphot.settings.PassbandMap`, optional (Default: None)
An object containing a mapping from instrumental passband names to
AAVSO passband names. This is used to automatically
update the passband column to AAVSO standard names if desired. See
the documentation for `stellarphot.settings.PassbandMap` for more
information. The object behaves like a dictionary when accessing it.
Attributes
----------
catalog_name: str
User readable name for the catalog.
catalog_source: str
User readable designation for the source of the catalog (could be a
URL or a journal reference).
passband_map: `stellarphot.settings.PassbandMap`
An object containing a mapping from instrumental passband names to
AAVSO passband names. This is used to automatically
update the passband column to AAVSO standard names if desired. See
the documentation for `stellarphot.settings.PassbandMap` for more
information. The object behaves like a dictionary when accessing it.
Notes
-----
For validation of inputs, you must provide input_data, catalog_name, and
catalog_source. If you do not, an empty table will be returned.
input_data MUST contain the following columns with the following units:
================= =======
Column Name Unit
----------------- -------
id None
ra u.deg
dec u.deg
mag None
passband None
================= =======
"""
# Define columns that must be in table and provide information about their type, and
# units.
catalog_descript = {
"id": None,
"ra": u.deg,
"dec": u.deg,
"mag": None,
"passband": None,
}
def __init__(
self,
*args,
input_data=None,
catalog_name=None,
catalog_source=None,
colname_map=None,
passband_map=None,
**kwargs,
):
if (input_data is None) and (catalog_name is None) and (catalog_source is None):
super().__init__(*args, **kwargs)
else:
self._passband_map = passband_map
if input_data is not None:
# Convert input data to QTable (while also checking for required
# columns)
super().__init__(
table_description=self.catalog_descript,
input_data=input_data,
colname_map=colname_map,
**kwargs,
)
# Add the TableAttributes directly to meta (and adding attribute
# functions below) since using TableAttributes results in a
# inability to access the values to due a
# AttributeError: 'TableAttribute' object has no attribute 'name'
self.meta["catalog_name"] = str(catalog_name)
self.meta["catalog_source"] = str(catalog_source)
# Apply the filter/passband name update
if passband_map is not None:
self._passband_map = passband_map.copy()
self._update_passbands()
else:
raise ValueError("You must provide input_data to CatalogData.")
@property
def catalog_name(self):
return self.meta["catalog_name"]
@property
def catalog_source(self):
return self.meta["catalog_source"]
@property
def passband_map(self):
return self._passband_map
@passband_map.setter
def passband_map(self, value):
self._passband_map = value
@staticmethod
def _tidy_vizier_catalog(data, mag_column_regex, color_column_regex):
"""
Transform a Vizier catalog with magnitudes into tidy structure. Or
at least tidier -- this only handles changing magnitude and color
columns into tidy format. In that format each row is a single
observation of a single object in a single passband.
Parameters
----------
data : `astropy.table.Table`
Table of catalog information. There are no restrictions on the columns.
mag_column_regex : str
Regular expression to match magnitude columns.
color_column_regex : str
Regular expression to match color columns.
Returns
-------
`astropy.table.Table`
Table with magnitude and color columns in tidy format. All other
columns are preserved as they were in the input data.
"""
mag_re = re.compile(mag_column_regex)
color_re = re.compile(color_column_regex)
# Find all the magnitude and color columns
mag_match = [mag_re.match(col) for col in data.colnames]
color_match = [color_re.match(col) for col in data.colnames]
# create a single list of all the matches
matches = [
m_match if m_match else c_match
for m_match, c_match in zip(mag_match, color_match, strict=True)
]
# The passband should be the first group match.
passbands = [match[1] for match in matches if match]
# The original column names for those that match
orig_cols = [match.string for match in matches if match]
# Each magnitude column should have an error column whose name
# is the magnitude column name with 'e_' prepended. While prepending
# is what pandas will need to transform the data, many non-magnitude
# columns will also start ``e_`` and we don't want to change those,
# so we will rename the error columns too.
mag_err_cols = [f"e_{col}" for col in orig_cols]
# Dictionary to update the magnitude column names. The prepended
# part could be anything, but the choice below is unlikely to be
# used in a column name in a real catalog.
mag_col_prepend = "magstphot"
mag_col_map = {
orig_col: f"{mag_col_prepend}_{passband}"
for orig_col, passband in zip(orig_cols, passbands, strict=True)
}
# Dictionary to update the magnitude error column names. The
# prepended part could be anything, but the choice below is
# unlikely to be used in a column name in a real catalog.
mag_err_col_prepend = "errorstphot"
mag_err_col_map = {
orig_col: f"{mag_err_col_prepend}_{passband}"
for orig_col, passband in zip(mag_err_cols, passbands, strict=True)
}
# All columns except those we have renamed should be preserved, so make
# a list of them for use in wide_to_long.
id_columns = set(data.colnames) - set(orig_cols) - set(mag_err_cols)
# Make the input data into a Pandas DataFrame
df = data.to_pandas()
# Rename the magnitude and magnitude error columns
df.rename(columns=mag_col_map, inplace=True)
df.rename(columns=mag_err_col_map, inplace=True)
# Make the DataFrame tidy
df = pd.wide_to_long(
df,
stubnames=[mag_col_prepend, mag_err_col_prepend],
i=id_columns,
j="passband",
sep="_",
suffix=".*",
)
# Make the magnitude and error column names more sensible
df.rename(
columns={mag_col_prepend: "mag", mag_err_col_prepend: "mag_error"},
inplace=True,
)
# Reset the index, which is otherwise a multi-index of the id columns.
df = df.reset_index()
# Convert back to an astropy table
return Table.from_pandas(df)
[docs]
@classmethod
def from_vizier(
cls,
field_center,
desired_catalog,
radius=0.5 * u.degree,
clip_by_frame=False,
padding=100,
colname_map=None,
mag_column_regex=r"^([a-zA-Z]+|[a-zA-Z]+-[a-zA-Z]+)_?mag$",
color_column_regex=r"^([a-zA-Z]+-[a-zA-Z]+)$",
prepare_catalog=None,
):
"""
Return the items from catalog that are within the search radius and
(optionally) within the field of view of a frame.
Parameters
----------
field_center : `astropy.coordinates.SkyCoord`, `astropy.wcs.WCS`, or FITS header
Either a `~astropy.coordinates.SkyCoord` object, a `~astropy.wcs.WCS` object
or a FITS header with WCS information. The input coordinate should be the
center of the frame; if a header or WCS is the input then the center of the
frame will be determined from the WCS.
desired_catalog : str
Vizier name of the catalog to be searched.
radius : float, optional
Radius, in degrees, around which to search. Default is 0.5.
clip_by_frame : bool, optional
If ``True``, only return items that are within the field of view
of the frame. Default is ``True``.
padding : int, optional
Coordinates need to be at least this many pixels in from the edge
of the frame to be considered in the field of view. Default value
is 100.
colname_map : dict, optional
Dictionary mapping column names in the catalog to column names in
a `stellarphot.CatalogData` object. Default is ``None``.
mag_column_regex : str, optional
Regular expression to match magnitude columns. See notes below for
more information about the default value.
color_column_regex : str, optional
Regular expression to match color columns. See notes below for
more information about the default value.
prepare_catalog : callable, optional
Function to call on the catalog after it is retrieved from Vizier.
Returns
-------
`stellarphot.CatalogData`
Table of catalog information.
Notes
-----
In many Vizier catalogs, the magnitude columns are named with a passband
name followed by ``mag``, sometimes with an underscore ``_`` in between.
For example, the Johnson V magnitude column is
``Vmag`` or ``V_mag``. The default value for ``mag_column_regex`` will match any
column name that starts with a letter or letters, followed by ``mag`` or
``_mag`` with an underscore in between.
In many Vizier catalogs, the color columns are named with the passbands
separated by a hyphen. For example, the Johnson V-I color column is
``V-I``. The default value for ``color_column_regex`` will match any
column name that starts with a letter or letters, followed by a hyphen,
followed by a letter or letters.
"""
if isinstance(field_center, SkyCoord):
# Center was passed in, just use it.
center = field_center
if clip_by_frame:
raise ValueError(
"To clip entries by frame you must use "
"a WCS as the first argument."
)
elif isinstance(field_center, WCS):
center = SkyCoord(*field_center.wcs.crval, unit="deg")
else:
wcs = WCS(field_center)
# The header may not have contained WCS information. In that case
# the WCS CTYPE will be empty strings and we need to raise an
# error.
if wcs.wcs.ctype[0] == "" and wcs.wcs.ctype[1] == "":
raise ValueError(
f"Invalid coordinates in input {field_center}. Make sure the "
"header contains valid WCS information or pass in a WCS or "
"coordinate."
)
center = SkyCoord(*wcs.wcs.crval, unit="deg")
# Get catalog via cone search
Vizier.ROW_LIMIT = -1 # Set row_limit to have no limit
cat = Vizier.query_region(center, radius=radius, catalog=desired_catalog)
# Vizier always returns list even if there is only one element. Grab that
# element.
cat = cat[0]
if prepare_catalog is not None:
final_cat = prepare_catalog(cat)
else:
final_cat = CatalogData._tidy_vizier_catalog(
cat, mag_column_regex, color_column_regex
)
# Since we go through pandas, we lose the units, so we need to add them back
# in.
#
# We need to swap the key/values on the input map to get the old column names
# as values.
invert_map = {v: k for k, v in colname_map.items()}
final_cat[invert_map["ra"]].unit = u.deg
final_cat[invert_map["dec"]].unit = u.deg
# Make the CatalogData object....
cat = cls(
input_data=final_cat,
colname_map=colname_map,
catalog_name=desired_catalog,
catalog_source="Vizier",
)
# ...and now that the column names are standardized, clip by frame if
# desired.
if clip_by_frame:
cat_coords = SkyCoord(ra=cat["ra"], dec=cat["dec"])
wcs = WCS(field_center)
x, y = wcs.all_world2pix(cat_coords.ra, cat_coords.dec, 0)
in_x = (x >= padding) & (x <= wcs.pixel_shape[0] - padding)
in_y = (y >= padding) & (y <= wcs.pixel_shape[1] - padding)
in_fov = in_x & in_y
cat = cat[in_fov]
return cat
[docs]
def passband_columns(self, passbands=None):
"""
Return an `astropy.table.Table` with passbands as column names instead
of the default format, which has a single column for passbands.
Parameters
----------
passbands : list, optional
List of passbands to include in the output. If not provided, all
passbands in the catalog will be included.
Returns
-------
`astropy.table.Table`
Table of catalog information with passbands as column names. See Notes below
for important details about column names.
Notes
-----
The column names in the output will be the passband names with ``mag_`` as a
prefix. An error column for each passband will be generated, with the prefix
``mag_error_``. If the catalog already has columns with these names, they will
be overwritten. The input catalog will not be changed.
"""
catalog_passbands = set(self["passband"])
if passbands is None:
passbands = catalog_passbands
input_passbands = set(passbands)
missing_passbands = input_passbands - catalog_passbands
if missing_passbands:
raise ValueError(
f"Passbands \"{', '.join(missing_passbands)}\" not found in catalog."
)
passband_mask = np.zeros(len(self), dtype=bool)
for passband in input_passbands:
passband_mask |= self["passband"] == passband
reduced_input = self[passband_mask]
# Switch to pandas for making the new table.
df = reduced_input.to_pandas()
# This makes a MultiIndex for the columns -- "mag" and "mag_error" are the
# top level, and the passbands are the second level.
df = df.pivot(
columns="passband", index=["id", "ra", "dec"], values=["mag", "mag_error"]
)
# The column names are a MultiIndex, so we flatten them to either "mag_band"
# or "mag_error_band", where "band" is the passband name.
df.columns = df.columns.to_series().str.join("_")
# We also reset the index which was set to the id, ra, and dec columns above.
df = df.reset_index()
# Convert back to an astropy table and return it.
return Table.from_pandas(df)
[docs]
def apass_dr9(field_center, radius=1 * u.degree, clip_by_frame=False, padding=100):
"""
Return the items from APASS DR9 that are within the search radius and
(optionally) within the field of view of a frame.
Parameters
----------
field_center : `astropy.coordinates.SkyCoord`, `astropy.wcs.WCS`, or FITS header
Either a `~astropy.coordinates.SkyCoord` object, a `~astropy.wcs.WCS` object
or a FITS header with WCS information. The input coordinate should be the
center of the frame; if a header or WCS is the input then the center of the
frame will be determined from the WCS.
radius : `astropy.units.Quantity`, optional
Radius around which to search.
clip_by_frame : bool, optional
If ``True``, only return items that are within the field of view
of the frame. Default is ``True``.
padding : int, optional
Coordinates need to be at least this many pixels in from the edge
of the frame to be considered in the field of view. Default value
is 100.
Returns
-------
`stellarphot.CatalogData`
Table of catalog information.
Notes
-----
APASS DR9 does not include an identifier column. Thought Vizier does provide
a ``recno`` column, it is does not stay the same over time. This function generates
an ID based on the coordinates of the APASS star, following the guidelines in
`IAU designation specification <https://cds.unistra.fr/Dic/iau-spec.html>`_.
"""
apass_colnames = {
"recno": "id", # There is no APASS ID, this is the one generated by Vizier
"RAJ2000": "ra",
"DEJ2000": "dec",
}
raw_catalog = CatalogData.from_vizier(
field_center,
"II/336/apass9",
radius=radius,
clip_by_frame=clip_by_frame,
padding=padding,
colname_map=apass_colnames,
)
# IAU requires an acronym to star, so make it APASS plus SP for stellarphot
designation_acronym = "APASSSP"
# The formats below include 4 digits after the decimal point (accuracy of about
# 0.5 arcsec), a leading sign (+ or -) and leading zeros so that the RA is always
# three digits before the decimal and the DEC is always two digits before the
# decimal.
coord_string = [
f"J{ra.to('degree').value:0=+9.4f}{dec.to('degree').value:0=+8.4f}"
for ra, dec in zip(raw_catalog["ra"], raw_catalog["dec"], strict=True)
]
# IAU says there is a space between the acronym and the coordinates.
raw_catalog["id"] = [f"{designation_acronym} {coord}" for coord in coord_string]
# Translate the passbands to AAVSO standard names.
# No need to change B and V since those are already correct.
# Do this *after* initialization so that the original APASS band names
# are used for the tidy-ification operation.
raw_catalog.passband_map = PassbandMap(
name="APASS",
your_filter_names_to_aavso={
"g": "SG",
"r": "SR",
"i": "SI",
},
)
raw_catalog._update_passbands()
return raw_catalog
[docs]
def vsx_vizier(field_center, radius=1 * u.degree, clip_by_frame=False, padding=100):
"""
Return the items from the copy of VSX on Vizier that are within the search
radius and (optionally) within the field of view of a frame.
Parameters
----------
field_center : `astropy.coordinates.SkyCoord`, `astropy.wcs.WCS`, or FITS header
Either a `~astropy.coordinates.SkyCoord` object, a `~astropy.wcs.WCS` object
or a FITS header with WCS information. The input coordinate should be the
center of the frame; if a header or WCS is the input then the center of the
frame will be determined from the WCS.
radius : `astropy.units.Quantity`, optional
Radius around which to search.
clip_by_frame : bool, optional
If ``True``, only return items that are within the field of view
of the frame. Default is ``True``.
padding : int, optional
Coordinates need to be at least this many pixels in from the edge
of the frame to be considered in the field of view. Default value
is 100.
Returns
-------
`stellarphot.CatalogData`
Table of catalog information.
"""
vsx_map = dict(
Name="id",
RAJ2000="ra",
DEJ2000="dec",
)
# This one is easier -- it already has the passband in a column name.
# We'll use the maximum magnitude as the magnitude column.
def prepare_cat(cat):
cat.rename_column("max", "mag")
cat.rename_column("n_max", "passband")
return cat
return CatalogData.from_vizier(
field_center,
"B/vsx/vsx",
radius=radius,
clip_by_frame=clip_by_frame,
padding=padding,
colname_map=vsx_map,
prepare_catalog=prepare_cat,
)
[docs]
class SourceListData(BaseEnhancedTable):
"""
A class to hold information on the source lists to pass to
aperture photometry routines. It verifies either image-based
locations (x/y) or sky-based locations (ra/dec) exist.
Parameters
----------
input_data: `astropy.table.Table`, optional (Default: None)
A table containing all the source list data to be validated.
This data is copied, so any changes made during validation will not
affect the input data, only the data in the class.
colname_map: dict, optional (Default: None)
A dictionary containing old column names as keys and new column
names as values. This is used to automatically update the column
names to the desired names BEFORE the validation is performed.
Attributes
----------
has_ra_dec: bool
True if the table has sky-based locations (ra/dec), False otherwise.
has_x_y: bool
True if the table has image-based locations (x/y), False otherwise.
Notes
-----
For validation of inputs, you must provide input_data, if you do not,
an empty table will be returned.
input_data MUST contain the following columns in the following column:
================= =======
Column Name Unit
----------------- -------
star_id None
================= =======
In addition to the star_id columns you must have EITHER
================= =======
Column Name Unit
----------------- -------
ra u.deg
dec u.deg
================= =======
and/or
================= =======
Column Name Unit
----------------- -------
xcenter u.pix
ycenter u.pix
================= =======
to define the locations of the sources. If one locaton pair is provided but not
the other, the missing columns will be added but assigned NaN values. It is ok
to provide both sky and image location, but no validation is done to ensure they
are consistent.
"""
# Define columns that must be in table and provide information about their type, and
# units.
sourcelist_descript = {
"star_id": None,
"ra": u.deg,
"dec": u.deg,
"xcenter": u.pix,
"ycenter": u.pix,
}
def __init__(self, *args, input_data=None, colname_map=None, **kwargs):
if input_data is None:
super().__init__(*args, **kwargs)
else:
# Check data before copying to avoid recursive loop and non-QTable
# data input.
if not isinstance(input_data, Table) or isinstance(
input_data, BaseEnhancedTable
):
raise TypeError(
"input_data must be an astropy Table (and not a "
"BaseEnhancedTable) as data."
)
# Process inputs and save as needed
data = input_data.copy()
# Rename columns before checking for ra/dec or xcenter/ycenter
# columns being missing.
if colname_map is not None:
# Confirm a proper colname_map is passed
try:
self._colname_map = {k: v for k, v in colname_map.items()}
except AttributeError as err:
raise TypeError(
"You must provide a dict as table_description (it "
f"is type {type(self._colname_map)})."
) from err
self._update_colnames(self._colname_map, data)
# No need to repeat this
self._colname_map = None
else:
self._colname_map = None
# Check if RA/Dec or xcenter/ycenter are missing
ra_dec_present = True
x_y_present = True
nosky_pos = (
"ra" not in data.colnames
or "dec" not in data.colnames
or np.isnan(data["ra"].value).all()
or np.isnan(data["dec"].value).all()
)
noimg_pos = (
"xcenter" not in data.colnames
or "ycenter" not in data.colnames
or np.isnan(data["xcenter"].value).all()
or np.isnan(data["ycenter"].value).all()
)
if nosky_pos:
ra_dec_present = False
if noimg_pos:
x_y_present = False
if nosky_pos and noimg_pos:
raise ValueError(
"data must have either sky (ra, dec) or "
+ "image (xcenter, ycenter) position."
)
# Create empty versions of any missing columns
for this_col in ["ra", "dec", "xcenter", "ycenter"]:
# Create blank ra/dec columns
if this_col not in data.colnames:
data[this_col] = Column(
data=np.full(len(data), np.nan),
name=this_col,
unit=self.sourcelist_descript[this_col],
)
# Convert input data to QTable (while also checking for required columns)
super().__init__(
table_description=self.sourcelist_descript,
input_data=data,
colname_map=None,
**kwargs,
)
self.meta["has_ra_dec"] = ra_dec_present
self.meta["has_x_y"] = x_y_present
@property
def has_ra_dec(self):
return self.meta["has_ra_dec"]
@property
def has_x_y(self):
return self.meta["has_x_y"]
[docs]
def drop_ra_dec(self):
# drop sky-based positions from existing SourceListData structure
self.meta["has_ra_dec"] = False
self["ra"] = Column(data=np.full(len(self), np.nan), name="ra", unit=u.deg)
self["dec"] = Column(data=np.full(len(self), np.nan), name="dec", unit=u.deg)
[docs]
def drop_x_y(self):
# drop image-based positionsfrom existing SourceListData structure
self.meta["has_x_y"] = False
self["xcenter"] = Column(data=np.full(len(self), np.nan), name="ra", unit=u.deg)
self["ycenter"] = Column(
data=np.full(len(self), np.nan), name="dec", unit=u.deg
)