Source code for stellarphot.photometry.photometry

import logging
import warnings
from pathlib import Path

import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.nddata import CCDData, NoOverlapError
from astropy.stats import SigmaClip
from astropy.table import Column, vstack
from astropy.time import Time
from astropy.utils.exceptions import AstropyUserWarning
from astropy.wcs import FITSFixedWarning
from ccdproc import ImageFileCollection
from photutils.aperture import (
    ApertureStats,
    CircularAnnulus,
    CircularAperture,
    aperture_photometry,
)
from photutils.centroids import centroid_sources
from pydantic import BaseModel, validate_call
from scipy.spatial.distance import cdist

from stellarphot import PhotometryData, SourceListData
from stellarphot.settings import (
    Camera,
    Observatory,
    PhotometryApertures,
    PhotometryOptionalSettings,
    PhotometrySettings,
)

from .source_detection import compute_fwhm, fast_fwhm_from_image

__all__ = [
    "AperturePhotometry",
    "single_image_photometry",
    "multi_image_photometry",
    "find_too_close",
    "clipped_sky_per_pix_stats",
    "calculate_noise",
]

# Allowed FITS header keywords for exposure values
EXPOSURE_KEYWORDS = ["EXPOSURE", "EXPTIME", "TELAPSE", "ELAPTIME", "ONTIME", "LIVETIME"]


[docs] class AperturePhotometry(BaseModel): """Class to perform aperture photometry on one or more images""" settings: PhotometrySettings @validate_call def __call__( self, file_or_directory: str | Path, **kwargs, ) -> PhotometryData: """ Perform aperture photometry on a single image or a directory of images. Parameters ---------- file_or_directory : str or Path The file or directory on which to perform aperture photometry. logline : str, optional (Default: "single_image_photometry:") String to prepend to all log messages, *only used for single image photometry*. reject_unmatched : bool, optional (Default: True) If ``True``, any sources that are not detected on all the images are rejected. If you are interested in a source that can intermittently fall below your detection limits, we suggest setting this to ``False`` so that all sources detected on each image are reported. *Only used for multi-image photometry*. object_of_interest : str, optional (Default: None) Name of the object of interest. The only files on which photometry will be done are those whose header contains the keyword ``OBJECT`` whose value is ``object_of_interest``. *Only used for multi-image photometry* to select which files to perform photometry on. Returns ------- photom_data : `stellarphot.PhotometryData` Photometry data for all the sources on which aperture photometry was performed in all the images. This may be a subset of the sources in the source list if locations were too close to the edge of any one image or to each other for successful aperture photometry. """ # Make sure we have a Path object path = Path(file_or_directory) if path.is_dir(): photom_data = multi_image_photometry(path, self.settings, **kwargs) elif path.is_file(): image = CCDData.read(path) photom_data = single_image_photometry( image, self.settings, fname=str(path), **kwargs ) else: raise ValueError( f"file_or_directory '{path}' is not a valid file or directory." ) return photom_data
[docs] def single_image_photometry( ccd_image, photometry_settings, fname=None, logline="single_image_photometry:", ): """ Perform aperture photometry on a single image, with an options for estimating the local background from sigma clipped stats of the counts in an annulus around the aperture. If the sky positions of the sources are in the sourcelist and not the image positions (x,y), then the function will compute the image positions from the sky positions using the WCS of the image. It assumes the input sky positions are in decimal degree units and in the ICRS frame. Parameters ---------- ccd_image : `astropy.nddata.CCDData` Image on which to perform aperture photometry. It's headers must contain DATE-OBS, FILTER, and exposure time in units of seconds (identified by one of the following keywords: EXPOSURE, EXPTIME, TELAPSE, ELAPTIME, ONTIME, or LIVETIME). If AIRMASS is available it will be added to `phot_table`. This image must also have a WCS header associated with it if you want to use sky positions as inputs. photometry_settings : `stellarphot.settings.PhotometrySettings` Photometry settings to use for the photometry. This includes the camera, observatory, the aperture and annulus radii to use for the photometry, a map of passbands from your filters to AAVSO filter names, and options for the photometry. See `stellarphot.settings.PhotometrySettings` for more information. fname : str, optional (Default: None) Name of the image file on which photometry is being performed, used for logging and tracking purposes, the file is not accessed or modified. logline : str, optional (Default: "single_image_photometry:") String to prepend to all log messages. Returns ------- phot_table : `stellarphot.PhotometryData` Photometry data for all the locations in which aperture photometry was performed. This may be a subset of the sources in the sourcelist if locations were too close to the edge of the image or to each other for successful aperture photometry. If pixel (x/y) positions were used for the photometry, but a valid WCS header was not available for `ccd_image`, the output 'ra', 'dec', and 'bjd' columns will have np.nan values dropped_sources : list This of the star_ids of the sources that fell outside the image or were too close together and did not have photometry performed. Notes ----- The default behavior (set by `use_coordinates`="pixel") for determining the placement of apertures is to use the x/y positions in the `sourcelist`. This is because in most cases, ra/dec positions are derived from the x/y positions and the WCS of the `ccd_image`. This default reduces unnecessary computation and works if the WCS is not very accurate. When attempting to process multiple images of the same field, it makes sense to use the ra/dec positions in the sourcelist and the WCS of the `ccd_image` to compute the x/y positions on each image individually. In this scenario, the `use_coordinates` parameter should be set to "sky". """ sourcelist = SourceListData.read( photometry_settings.source_location_settings.source_list_file ) camera = photometry_settings.camera observatory = photometry_settings.observatory photometry_apertures = photometry_settings.photometry_apertures photometry_options = photometry_settings.photometry_optional_settings logging_options = photometry_settings.logging_settings passband_map = photometry_settings.passband_map use_coordinates = photometry_settings.source_location_settings.use_coordinates # Check that the input parameters are valid if not isinstance(ccd_image, CCDData): raise TypeError( "ccd_image must be a CCDData object, but it is " f"'{type(ccd_image)}'." ) if not isinstance(sourcelist, SourceListData): raise TypeError( "sourcelist must be a SourceListData object, but it is " f"'{type(sourcelist)}'." ) if not isinstance(camera, Camera): raise TypeError(f"camera must be a Camera object, but it is '{type(camera)}'.") if not isinstance(observatory, Observatory): raise TypeError( "observatory_location must be a EarthLocation object, but it " f"is '{type(observatory)}'." ) if not isinstance(photometry_apertures, PhotometryApertures): raise TypeError( "photometry_apertures must be a PhotometryApertures object, but it is " f"'{type(photometry_apertures)}'." ) if not isinstance(photometry_options, PhotometryOptionalSettings): raise TypeError( "photometry_options must be a PhotometryOptions object, but it is " f"'{type(photometry_options)}'." ) # Set up logging logfile = logging_options.logfile console_log = logging_options.console_log # If we have no handlers, set up logging logger = logging.getLogger("single_image_photometry") # Use this to catch if logging was already set up by a call # from multi_image_photometry before creating new log file handler # (Warning: This actually just catches if the logger has any handlers. # Probably not the best way to do this.) fh_exists = False # Default to filehandler not existing if logger.hasHandlers() is False: logger.setLevel(logging.INFO) # Set up logging to a file (in addition to any logging below) if logfile is not None: # by default this appends to existing logfile fh = logging.FileHandler(logfile) log_format = logging.Formatter("%(levelname)s - %(message)s") fh.setFormatter(log_format) fh.setLevel(logging.INFO) logger.addHandler(fh) fh_exists = True # Set up logging to console if requested if console_log: ch = logging.StreamHandler() else: # otherwise effectively suppress output ch = logging.NullHandler() console_format = logging.Formatter("%(message)s") ch.setFormatter(console_format) ch.setLevel(logging.INFO) logger.addHandler(ch) # # Check CCDData headers before proceeding # # Search for exposure keyword in header and set exposure if found matched_kw = None for kw in EXPOSURE_KEYWORDS: if kw in ccd_image.header: matched_kw = kw break if matched_kw is None: logger.warning( f"{logline} None of the accepted exposure keywords " f"({format(', '.join(EXPOSURE_KEYWORDS))}) found in the " "header ... SKIPPING THIS IMAGE!" ) return None, None exposure = ccd_image.header[matched_kw] # Search for other keywords that are required try: date_obs = ccd_image.header["DATE-OBS"] except KeyError: logger.warning( f"{logline} 'DATE-OBS' not found in CCD image header " "... SKIPPING THIS IMAGE!" ) return None, None try: filter = ccd_image.header["FILTER"] except KeyError: logger.warning( f"{logline} 'FILTER' not found in CCD image header ... " "SKIPPING THIS IMAGE!" ) return None, None # Set high pixels to NaN (make sure ccd_image.data is a float array first) ccd_image.data = ccd_image.data.astype(float) ccd_image.data[ccd_image.data > camera.max_data_value.value] = np.nan # Extract necessary values from sourcelist structure star_ids = sourcelist["star_id"].value xs = sourcelist["xcenter"].value ys = sourcelist["ycenter"].value ra = sourcelist["ra"].value dec = sourcelist["dec"].value src_cnt = len(sourcelist) # If RA/Dec are available attempt to use them to determine the source positions if use_coordinates == "sky" and sourcelist.has_ra_dec: try: imgpos = ccd_image.wcs.world_to_pixel( SkyCoord(ra, dec, unit=u.deg, frame="icrs") ) xs, ys = imgpos[0], imgpos[1] except AttributeError: # No WCS, skip this image msg = f"{logline} ccd_image must have a valid WCS to use RA/Dec!" logger.warning(msg) return None, None elif use_coordinates == "sky" and not sourcelist.has_ra_dec: raise ValueError( "use_coordinates='sky' but sourcelist does not have" "RA/Dec coordinates!" ) if photometry_apertures.variable_aperture: # Get a fast, robust estimate of the FWHM of the sources for setting # the aperture size. fwhm = fast_fwhm_from_image( ccd_image, photometry_apertures.fwhm_estimate, noise=camera.read_noise.value, max_adu=camera.max_data_value.value, ) else: # Use the FWHM from the settings fwhm = photometry_apertures.fwhm_estimate # Reject sources that are within an aperture diameter of each other. dropped_sources = [] try: too_close = find_too_close( sourcelist, photometry_apertures.radius_pixels(fwhm), pixel_scale=camera.pixel_scale.value, ) except Exception as err: # Any failure here is BAD, so raise an error raise RuntimeError( f"Call to find_too_close() returned {type(err).__name__}: {str(err)}" ) from err too_close_cnt = np.sum(too_close) non_overlap = ~too_close msg = ( f"{logline} {too_close_cnt} of {src_cnt} sources within 2 aperture radii of " "nearest neighbor" ) if photometry_options.reject_too_close: # Track dropped sources due to being too close together dropped_sources.extend(star_ids[too_close].tolist()) # Remove sources too close together star_ids = star_ids[non_overlap] xs = xs[non_overlap] ys = ys[non_overlap] ra = ra[non_overlap] dec = dec[non_overlap] msg += " ... removed them." else: msg += " ... keeping them." logger.info(msg) # Remove all source positions too close to edges of image (where the annulus would # extend beyond the image boundaries). padding = photometry_apertures.outer_annulus out_of_bounds = ( (xs < padding) | (xs > (ccd_image.shape[1] - padding)) | (ys < padding) | (ys > (ccd_image.shape[0] - padding)) ) in_bounds = ~out_of_bounds # Track dropped sources due to out of bounds positions dropped_sources.extend(star_ids[out_of_bounds].tolist()) # Remove sources too close to the edges star_ids = star_ids[in_bounds] xs = xs[in_bounds] ys = ys[in_bounds] ra = ra[in_bounds] dec = dec[in_bounds] in_cnt = np.sum(in_bounds) out_cnt = np.sum(out_of_bounds) logger.info( f"{logline} {out_cnt} sources too close to image edge ... removed " "them." ) logger.info( f"{logline} {in_cnt} of {src_cnt} original sources to have photometry " "done." ) # If we are using x/y positions previously obtained from the ra/dec positions and # WCS, then recentroid the sources to refine the positions. This is # particularly useful is processing multiple images of the same field # and just passing the same sourcelist when calling single_image_photometry # on each image. if use_coordinates == "sky": try: xcen, ycen = centroid_sources( ccd_image.data, xs, ys, box_size=2 * int(photometry_apertures.radius_pixels(fwhm)) + 1, ) except NoOverlapError: logger.warning( f"{logline} Determining new centroids failed ... " "SKIPPING THIS IMAGE!" ) return None, None else: # Proceed # Calculate offset between centroid in this image and the positions # based on input RA/Dec. center_diff = np.sqrt((xs - xcen) ** 2 + (ys - ycen) ** 2) # The center really shouldn't move more than about the fwhm, could # rework this in the future to use that instead. too_much_shift = ( center_diff > photometry_settings.source_location_settings.shift_tolerance ) # If the shift is too large, use the WCS-derived positions instead # (these sources are probably too faint for centroiding to work well) xcen[too_much_shift] = xs[too_much_shift] ycen[too_much_shift] = ys[too_much_shift] xs, ys = xcen, ycen # Compute RA/Dec if not already provided if not sourcelist.has_ra_dec: try: skypos = ccd_image.wcs.pixel_to_world(xs, ys) ra = skypos.ra.value dec = skypos.dec.value except AttributeError: ra = [np.nan] * len(xs) dec = [np.nan] * len(ys) # Define apertures and annuli for the aperture photometry aper_locs = np.array([xs, ys]).T apers = CircularAperture(aper_locs, r=photometry_apertures.radius_pixels(fwhm)) annuli = CircularAnnulus( aper_locs, r_in=photometry_apertures.inner_annulus, r_out=photometry_apertures.outer_annulus, ) # Perform the aperture photometry photom = aperture_photometry( ccd_image.data, (apers, annuli), mask=ccd_image.mask, method=photometry_options.partial_pixel_method, ) # photutils 2.1.0 stopped returning pixel units for x/y center # https://photutils.readthedocs.io/en/stable/changelog.html#id6 if photom["xcenter"].unit is None: photom["xcenter"] = photom["xcenter"] * u.pixel photom["ycenter"] = photom["ycenter"] * u.pixel # Add source ids to the photometry table photom["star_id"] = star_ids photom["ra"] = ra * u.deg photom["dec"] = dec * u.deg # Drop ID column from aperture_photometry() del photom["id"] # Add various CCD image parameters to the photometry table if fname is not None: photom["file"] = fname else: photom["file"] = [""] * len(photom) # Set various columns based on CCDData headers (which we # checked for earlier) photom["exposure"] = [exposure] * len(photom) * u.second photom["date-obs"] = Time(Column(data=[date_obs])) photom["filter"] = [filter] * len(photom) photom.rename_column("filter", "passband") # Check for airmass keyword in header and set 'airmass' if found, # but accept it may not be available try: photom["airmass"] = [ccd_image.header["AIRMASS"]] * len(photom) except KeyError: logger.warning( f"{logline} 'AIRMASS' not found in CCD " "image header ... setting to NaN!" ) photom["airmass"] = [np.nan] * len(photom) # Save aperture and annulus information photom.rename_column("aperture_sum_0", "aperture_sum") photom.rename_column("aperture_sum_1", "annulus_sum") photom["aperture_sum"].unit = ccd_image.unit photom["annulus_sum"].unit = ccd_image.unit photom["aperture"] = apers.r * u.pixel photom["annulus_inner"] = annuli.r_in * u.pixel photom["annulus_outer"] = annuli.r_out * u.pixel # By convention, area is in units of pixels (not pixels squared) in a digital image photom["aperture_area"] = apers.area * u.pixel photom["annulus_area"] = annuli.area * u.pixel if photometry_options.reject_background_outliers: msg = f"{logline} Computing clipped sky stats ... " try: ( avg_sky_per_pix, med_sky_per_pix, std_sky_per_pix, ) = clipped_sky_per_pix_stats(ccd_image, annuli) except AttributeError: msg += "BAD ANNULUS ('sky_per_pix' stats set to np.nan) ... " avg_sky_per_pix, med_sky_per_pix, std_sky_per_pix = np.nan, np.nan, np.nan photom["sky_per_pix_avg"] = avg_sky_per_pix / u.pixel photom["sky_per_pix_med"] = med_sky_per_pix / u.pixel photom["sky_per_pix_std"] = std_sky_per_pix / u.pixel msg += "DONE." logger.info(msg) else: # Don't reject outliers (but why would you do this?) logger.warning( f"{logline} SUGGESTION: You are computing sky per pixel " "without clipping (set reject_background_outliers=True " "to perform clipping)." ) med_pp = [] std_pp = [] for mask in annuli.to_mask(): annulus_data = mask.cutout(ccd_image) med_pp.append(np.median(annulus_data)) std_pp.append(np.std(annulus_data)) photom["sky_per_pix_avg"] = photom["annulus_sum"] / photom["annulus_area"] photom["sky_per_pix_med"] = np.array(med_pp) * ccd_image.unit / u.pixel photom["sky_per_pix_std"] = np.array(std_pp) * ccd_image.unit / u.pixel # Compute counts using clipped stats on sky per pixel photom["aperture_net_cnts"] = photom["aperture_sum"].value - ( photom["aperture_area"].value * photom["sky_per_pix_avg"].value ) photom["aperture_net_cnts"].unit = ccd_image.unit # Fit the FWHM of the sources (can result in many warnings due to # failed FWHM fitting, capture those warnings and print a summary) msg = f"{logline} Fitting FWHM of all sources (may take a few minutes) ... " with warnings.catch_warnings(record=True) as warned: warnings.filterwarnings("always", category=AstropyUserWarning) fwhm_x, fwhm_y = compute_fwhm( ccd_image, photom, fwhm_estimate=photometry_apertures.fwhm_estimate, fit_method=photometry_options.fwhm_method, sky_per_pix_column="sky_per_pix_avg", ) num_warnings = len(warned) msg += f"fitting failed on {num_warnings} of {len(photom)} sources ... " msg += "DONE." logger.info(msg) # Deal with bad FWHM values bad_fwhm = (fwhm_x < 1) | (fwhm_y < 1) # Set bad values to NaN now fwhm_x[bad_fwhm] = np.nan fwhm_y[bad_fwhm] = np.nan photom["fwhm_x"] = fwhm_x * u.pixel photom["fwhm_y"] = fwhm_y * u.pixel photom["width"] = ((fwhm_x + fwhm_y) / 2) * u.pixel if np.sum(bad_fwhm) > 0: logger.info( f"{logline} Bad FWHM values (<1 pixel) for {np.sum(bad_fwhm)} " "sources." ) # Flag sources with bad counts before computing noise. # This can happen, for example, when the object is faint and centroiding is # bad. It can also happen when the sky background is low. bad_cnts = photom["aperture_net_cnts"].value < 0 # This next line works because booleans are just 0/1 in numpy if np.sum(bad_cnts) > 0: logger.info( f"{logline} Aperture net counts negative for {np.sum(bad_cnts)} " "sources." ) all_bads = bad_cnts | bad_fwhm photom["aperture_net_cnts"][all_bads] = np.nan logger.info( f"{logline} {np.sum(all_bads)} sources with either bad FWHM fit " "or bad aperture net counts had aperture_net_cnts set to NaN." ) # Compute instrumental magnitudes photom["mag_inst"] = -2.5 * np.log10( camera.gain.value * photom["aperture_net_cnts"].value / photom["exposure"].value ) # Compute and save noise msg = f"{logline} Calculating noise for all sources ... " noise = calculate_noise( camera=camera, counts=photom["aperture_net_cnts"].value, sky_per_pix=photom["sky_per_pix_avg"].value, aperture_area=photom["aperture_area"].value, annulus_area=photom["annulus_area"].value, exposure=photom["exposure"].value, include_digitization=photometry_options.include_dig_noise, ) photom["noise_electrons"] = noise # Noise in electrons photom["noise_electrons"].unit = u.electron photom["noise_cnts"] = noise / camera.gain.value # Noise in counts photom["noise_cnts"].unit = ccd_image.unit # Compute and save SNR snr = camera.gain * photom["aperture_net_cnts"] / photom["noise_electrons"] # If the SNR is dimensionless, convert it to a plain number if snr.unit.physical_type == "dimensionless": snr = snr.value photom["snr"] = snr photom["mag_error"] = 1.085736205 / snr msg += "DONE." logger.info(msg) # Close logfile if it was created if fh_exists: fh.flush() fh.close() # Remove logger handler logger.handlers.clear() # Create PhotometryData object to return photom_data = PhotometryData( observatory=observatory, camera=camera, input_data=photom, passband_map=passband_map, ) return photom_data, dropped_sources
[docs] def multi_image_photometry( directory_with_images, photometry_settings, reject_unmatched=True, object_of_interest=None, ): """ Perform aperture photometry on a directory of images. Parameters ---------- directory_with_images : str Folder containing the images on which to do photometry. Photometry will only be done on images that contain the ``object_of_interest``. All images *must* have WCS headers and the following headers: OBJECT, DATE-OBS, an exposure time header (which can be any of the following: EXPOSURE, EXPTIME, TELAPSE, ELAPTIME, ONTIME, or LIVETIME), and FILTER. If AIRMASS is available it will be added to `phot_table`. photometry_settings : `stellarphot.settings.PhotometrySettings` Photometry settings to use for the photometry. This includes the camera, observatory, the aperture and annulus radii to use for the photometry, a map of passbands from your filters to AAVSO filter names, and options for the photometry. See `stellarphot.settings.PhotometrySettings` for more information. reject_unmatched : bool, optional (Default: True) If ``True``, any sources that are not detected on all the images are rejected. If you are interested in a source that can intermittently fall below your detection limits, we suggest setting this to ``False`` so that all sources detected on each image are reported. object_of_interest : str, optional (Default: None) Name of the object of interest. The only files on which photometry will be done are those whose header contains the keyword ``OBJECT`` whose value is ``object_of_interest``. *Only used for multi-image photometry* to select which files to perform photometry on. Returns ------- phot_table : `stellarphot.PhotometryData` Photometry data for all the sources on which aperture photometry was performed in all the images. This may be a subset of the sources in the sourcelist if locations were too close to the edge of any one image or to each other for successful aperture photometry. """ sourcelist = SourceListData.read( photometry_settings.source_location_settings.source_list_file ) # Initialize lists to track all PhotometryData objects and all dropped sources phots = [] missing_sources = [] # Confirm sourcelist has ra/dec coordinates if not sourcelist.has_ra_dec: raise ValueError( "multi_image_photometry: sourcelist must have RA/Dec " "coordinates to use this function." ) # Set up logging (retrieve a logger but purge any existing handlers) multilogger = logging.getLogger("multi_image_photometry") multilogger.setLevel(logging.INFO) console_format = logging.Formatter("%(message)s") # Remove all other existing handlers from the logger # (Kind of brute force, but works for our purposes for handler in multilogger.handlers[:]: multilogger.removeHandler(handler) logfile = photometry_settings.logging_settings.logfile console_log = photometry_settings.logging_settings.console_log # Set up logging: # Check if logfile is not None, set up logging to be written to the logfile. # Next check if console_log is True, if it is, set up logging to be written # to the console. # If neither of these are true, set up logging to be written to NullHandler # which effectively suppresses the logging output. # Set up logging to a file (in addition to any logging below) if logfile is not None: # Get the name of the logfile without the path logfile = Path(logfile).name # Redirect the logfile to the directory_with_images logfile = Path(directory_with_images) / logfile # Change the settings so when they are passed to single_image_photometry # the logging will be written to the same logfile photometry_settings.logging_settings.logfile = str(logfile) # Keep original name without path logfile_name = str(Path(logfile).name) # by default this appends to existing logfile fh = logging.FileHandler(logfile) log_format = logging.Formatter("%(levelname)s - %(message)s") fh.setFormatter(log_format) fh.setLevel(logging.INFO) multilogger.addHandler(fh) # Set up logging to console if requested if console_log: ch = logging.StreamHandler() else: # otherwise effectively suppress output ch = logging.NullHandler() ch.setFormatter(console_format) ch.setLevel(logging.INFO) multilogger.addHandler(ch) ## ## Process all the individual files ## # Build image file collection ifc = ImageFileCollection(directory_with_images) # Disable any root logger handlers that are active before using # logging (must be done here because ImageFileCollection() creates # a logger) if logging.root.hasHandlers(): logging.root.handlers.clear() n_files_processed = 0 msg = f"Starting photometry of files in {directory_with_images} ... " if logfile is not None: msg += f"logging output to {logfile_name}" # If not logging to console, print message here if not console_log: print(msg) multilogger.info(msg) # Suppress the FITSFixedWarning that is raised when reading a FITS file header warnings.filterwarnings("ignore", category=FITSFixedWarning) # Process all the files for this_ccd, this_fname in ifc.ccds(object=object_of_interest, return_fname=True): multilogger.info(f"multi_image_photometry: Processing image {this_fname}") if this_ccd.wcs is None: multilogger.warning(" .... SKIPPING THIS IMAGE (NO WCS)") continue # Call single_image_photometry on each image n_files_processed += 1 multilogger.info(" Calling single_image_photometry ...") this_phot, this_missing_sources = single_image_photometry( this_ccd, photometry_settings, fname=this_fname, logline=" >", ) if (this_phot is None) or (this_missing_sources is None): multilogger.info(" single_image_photometry failed for this image.") else: multilogger.info( f" Done with single_image_photometry for {this_fname}\n\n" ) # Extend the list of missing stars missing_sources.extend(this_missing_sources) # And add the final table to the list of tables phots.append(this_phot) if n_files_processed == 0: raise RuntimeError("No images were processed!") ## ## Done processing individual images, now combine them into one table ## # Combine all of the individual photometry tables into one. # Attributes should survive intact assume all have same camera and observatory. all_phot = vstack(phots) # If requested, eliminate source not detected on every image by building # a set of all the unique star_ids that were missing on at least one image. if reject_unmatched and len(missing_sources) > 0: if len(missing_sources) > 1: uniques = set(missing_sources) else: uniques = set([missing_sources]) msg = f" Removing {len(uniques)} sources not observed in every image ... " # Purge the photometry table of all sources that were eliminated # on at least one image starid_to_remove = sorted([u for u in uniques if u in all_phot["star_id"]]) # add index to PhotometryData to speed up removal all_phot.add_index("star_id") # Remove the starid for objects not observed in every image if starid_to_remove: bad_rows = all_phot.loc_indices[starid_to_remove] try: bad_rows = list(bad_rows) except TypeError: bad_rows = [bad_rows] all_phot.remove_indices("star_id") all_phot.remove_rows(sorted(bad_rows)) # Drop index from PhotometryData to save memory all_phot.remove_indices("star_id") msg += "DONE." multilogger.info(msg) multilogger.info( f" DONE processing all matching images in {directory_with_images}" ) if logfile is not None and not console_log: print(f" DONE processing all matching images in {directory_with_images}") # Close the logfile if it is open if logfile is not None: fh.flush() fh.close() # Remove logger handler multilogger.handlers.clear() return all_phot
[docs] def find_too_close(sourcelist, aperture_rad, pixel_scale=None): """ Identify sources that are closer together than twice the aperture radius. If 'xcenter' and 'ycenter' are available in the sourcelist (as determined by the value of sourcelist.has_x_y), they are used to determine separation of sources in units of pixels, otherwise if only ra/dec are available, the pixel_scale is necessary to determine the separation. Parameters ---------- sourcelist : `stellarphot.SourceListData` A list of sources with x/y and/or RA/Dec coordinates. aperture_rad : int Radius of the aperture, in pixels. pixel_scale : `float, optional` (Default: None) Pixel scale of the image in arcsec/pixel. Only required if x/y coordinates are NOT provided. Returns ------- numpy array of bool Array the same length as the RA/Dec that is ``True`` where the sources are closer than two aperture radii, ``False`` otherwise. """ if not isinstance(sourcelist, SourceListData): raise TypeError( "sourcelist must be of type SourceListData not " f"'{type(sourcelist)}'" ) if not isinstance(pixel_scale, float): raise TypeError(f"pixel_scale must be a float not '{type(pixel_scale)}'") if sourcelist.has_x_y: x, y = sourcelist["xcenter"], sourcelist["ycenter"] # Find the pixel distance to the nearest neighbor for each source dist_mat = cdist(np.array([x, y]).T, np.array([x, y]).T, metric="euclidean") np.fill_diagonal(dist_mat, np.inf) # Return array with True where the distance is less than twice the aperture # radius return dist_mat.min(0) < 2 * aperture_rad elif sourcelist.has_ra_dec: if pixel_scale is None: raise ValueError( "pixel_scale must be provided if x/y coordinates are " "not available in the sourcelist." ) star_coords = SkyCoord( ra=sourcelist["ra"], dec=sourcelist["dec"], frame="icrs", unit="degree" ) idxc, d2d, d3d = star_coords.match_to_catalog_sky(star_coords, nthneighbor=2) return d2d < (aperture_rad * 2 * pixel_scale * u.arcsec) else: raise ValueError("sourcelist must have x/y or ra/dec coordinates")
[docs] def clipped_sky_per_pix_stats(data, annulus, sigma=5, iters=5): """ Calculate sigma-clipped statistics on an annulus. Parameters ---------- data : `astropy.nddata.CCDData` CCD image on which the annuli are defined. annulus : `photutils.CircularAnnulus` One or more annulus (of any shape) from photutils. sigma : float, optional Number of standard deviations from the central value a pixel must be to reject it. iters : int, optional Maximum number of sigma clip iterations to perform. Iterations stop automatically if no pixels are rejected. Returns ------- avg_sky_per_pix, med_sky_per_pix, std_sky_per_pix : `astropy.units.Quantity` Average, median and standard deviation of the sky per pixel. """ # Use ApertureStats from photutils to get what we need. sigclip = SigmaClip(sigma=sigma, maxiters=iters) ap_stats = ApertureStats(data, annulus, sigma_clip=sigclip, sum_method="center") return ( ap_stats.mean, ap_stats.median, ap_stats.std, )
[docs] def calculate_noise( camera=None, counts=0.0, sky_per_pix=0.0, aperture_area=0, annulus_area=0, exposure=0, include_digitization=False, ): """ Computes the noise in a photometric measurement. This function computes the noise (in units of electrons) in a photometric measurement using the revised CCD equation from Collins et al (2017) AJ, 153, 77 who based their expression on the one originally proposed for SNR by Merline, W. J., & Howell, S. B. 1995, Experimental Astronomy, 6, 163. The equation is: .. math:: \\sigma = \\sqrt{G \\cdot N_C + A \\cdot \\left(1 + \\frac{A}{B}\\right)\\cdot \\left[ G\\cdot S + D \\cdot t + R^2 + (0.289 G)^2\\right]} where :math:`\\sigma` is the noise, :math:`G` is the gain, :math:`N_C` is the source counts (which is photon/electron counts divided by gain), :math:`A` is the aperture area in pixels, :math:`B` is the annulus area in pixels, :math:`S` is the sky counts per pixel, :math:`D` is the dark current in electrons per second, :math:`R` is the read noise in electrons per pixel per read, and :math:`t` is exposure time in seconds. Note: The :math:`(0.289 G)^2` term is "digitization noise" and is optional. Parameters ---------- camera : `stellarphot.Camera` The camera object that contains the gain, read noise and dark current of the CCD. counts : float, optional Counts of the source. sky_per_pix : float, optional Sky per pixel in counts per pixel. aperture_area : int, optional Area of the aperture in pixels. annulus_area : int, optional Area of the annulus in pixels. exposure : int, optional Exposure time in seconds. include_digitization : bool, optional Whether to include the digitization noise. Defaults to False. Returns ------- noise : float The noise in the photometric measurement in electrons. """ if camera is None: raise ValueError("camera must be provided") elif not isinstance(camera, Camera): raise ValueError(f"camera must be of type Camera not '{type(camera)}'") # Extract values from camera object gain = camera.gain.value dark_current_per_sec = camera.dark_current.value read_noise = camera.read_noise.value try: no_annulus = (annulus_area == 0).all() except AttributeError: no_annulus = annulus_area == 0 if no_annulus: area_ratio = aperture_area else: area_ratio = aperture_area * (1 + aperture_area / annulus_area) # Convert counts to electrons poisson_source = gain * counts try: poisson_source = poisson_source.value except AttributeError: pass sky = area_ratio * gain * sky_per_pix dark = area_ratio * dark_current_per_sec * exposure rn_error = area_ratio * read_noise**2 digitization = 0.0 if include_digitization: digitization = area_ratio * (gain * 0.289) ** 2 return np.sqrt(poisson_source + sky + dark + rn_error + digitization)