Source code for stellarphot.photometry.source_detection

import warnings

import numpy as np
from astropy import units as u
from astropy.nddata import CCDData, block_reduce
from astropy.nddata.utils import Cutout2D
from astropy.stats import sigma_clipped_stats
from astropy.table import Table
from astropy.utils.exceptions import AstropyUserWarning
from photutils.detection import DAOStarFinder
from photutils.morphology import data_properties
from photutils.profiles import RadialProfile
from photutils.psf import fit_2dgaussian, fit_fwhm

from stellarphot.core import SourceListData
from stellarphot.settings.models import FwhmMethods

__all__ = ["source_detection", "compute_fwhm", "fast_fwhm_from_image"]


[docs] def compute_fwhm( ccd, sources, fwhm_estimate=5, x_column="xcenter", y_column="ycenter", fit_method=FwhmMethods.FIT, # This matches the old default sky_per_pix_avg=None, sky_per_pix_column=None, ): """ Computes the FWHM in both x and y directions of sources in an image. Parameters ---------- ccd : `astropy.nddata.CCDData` or `numpy.ndarray` The CCD Image array. sources : `astropy.table.Table` An astropy table of the positions of sources in the image. fwhm_estimate : float, optional (default=5) The initial guess for the FWHM of the sources in the image. x_column : str, optional (default='xcenter') The name of the column in `sources` that contains the x positions of the sources. y_column : str, optional (default='ycenter') The name of the column in `sources` that contains the y positions of the sources. fit : bool, optional (default=True) If ``True``, fit a 2D Gaussian to each source to estimate its FWHM. If ``False``, estimate the FWHM of each source by computing the second moments of its light distribution using photutils. sky_per_pix_avg : float or array-like of float, optional (default=0) Sky background to subtract before centroiding. Cannot specify both `sky_per_pix_avg` and `sky_per_pix_column`. sky_per_pix_column : str, optional The name of the column in `sources` that contains the sky background values for each source. This is used to subtract the sky background from the cutout before computing the FWHM. Cannot specify both `sky_per_pix_avg` and `sky_per_pix_column`. Returns ------- fwhm_x, fwhm_y : tuple of np.array float The FWHM of each source in the x and y directions. """ data = ccd.data if isinstance(ccd, CCDData) else ccd if sky_per_pix_avg is not None and sky_per_pix_column is not None: raise ValueError( "Cannot specify both `sky_per_pix_avg` and `sky_per_pix_column`." ) if sky_per_pix_column is not None: if sky_per_pix_column not in sources.colnames: raise ValueError(f"Column {sky_per_pix_column} not found in sources table.") sky_values = sources[sky_per_pix_column] if sky_per_pix_avg is not None: # User gave a value for the sky background to subtract sky_values = [sky_per_pix_avg] * len(sources) elif sky_per_pix_column is None: # User didn't give a value for the sky background to subtract # so try an of estimate it from the image sky_values = [np.nanmedian(ccd.data)] * len(sources) fwhm_x = [] fwhm_y = [] for source, sky in zip(sources, sky_values, strict=True): x = source[x_column] y = source[y_column] # Cutout2D needs no units on the center position, so remove unit # if it is present. try: x = x.value y = y.value sky = sky.value except AttributeError: pass # SKY SUBTRACT STUFF!! cutout = Cutout2D(data - sky, (x, y), 5 * fwhm_estimate) # Mask any NaNs in the data nan_mask = np.isnan(cutout.data) inp_mask = getattr(ccd, "mask", None) if inp_mask is not None: inp_mask = inp_mask[cutout.slices_original] mask = inp_mask | nan_mask else: mask = nan_mask cutout_xy = cutout.to_cutout_position((x, y)) match fit_method: case FwhmMethods.FIT: # Make sure we get an odd fits shape fit_shape = int(2 * ((5 * fwhm_estimate) // 2) + 1) # fit_fwhm is supposed to handle NaNs automatically but it doesn't # as of photutils 2.2.0 # see https://github.com/astropy/photutils/issues/2029 # For now replace any NaN with zero and hope for the best. cutout.data[nan_mask] = 0 fit = fit_fwhm( cutout.data, xypos=cutout_xy, fwhm=fwhm_estimate, fit_shape=fit_shape, mask=mask, ) fit = fit[0] fwhm_x.append(fit) # gaussian_sigma_to_fwhm * fit.x_stddev_1) fwhm_y.append(fit) # gaussian_sigma_to_fwhm * fit.y_stddev_1) # print('Still fitting!!') case FwhmMethods.MOMENTS: sc = data_properties(cutout.data) fwhm_xm = sc.fwhm.value fwhm_ym = fwhm_xm fwhm_x.append(fwhm_xm) fwhm_y.append(fwhm_ym) case FwhmMethods.PROFILE: radii = np.arange(int(3 * fwhm_estimate)) profile = RadialProfile(cutout.data, cutout_xy, radii) fwhm = profile.gaussian_fwhm fwhm_x.append(fwhm) fwhm_y.append(fwhm) case _: raise ValueError(f"Unknown fit method: {fit_method}") return np.array(fwhm_x), np.array(fwhm_y)
[docs] def source_detection( ccd, fwhm=8, sigma=3.0, iters=5, threshold=10.0, stddev=None, find_fwhm=True, sky_per_pix_avg=0, padding=0, verbose=False, ): """ Returns an SourceListData object containing the position of sources within the image identified using `photutils.DAOStarFinder` algorithm. It can also compute the FWHM of each source by fitting a 2D Gaussian which can be useful for choosing a useful aperture size for photometry. Parameters ---------- ccd : `numpy.ndarray` or `astropy.nddata.CCDData` This is the 2-D array of the image to be analyzed. If a CCDData object is passed, its data attribute will be used. fwhm : float, optional (default=8) Initial estimate of full-width half-max of stars in the image, in pixels. sigma : float, optional. (default=3.0) The number of standard deviations to use as the lower and upper clipping limit when calculating the image background. iters : int, optional (default=5) The number of iterations to perform sigma clipping threshold : float, optional. (default=10.0) The number of standard deviations above which to select sources. stddev : float, optional If provided, this value will be used as the standard deviation of the image. If not provided, the standard deviation will be calculated using `sigma_clipped_stats` on the image. find_fwhm : bool, optional (default=True) If ``True``, estimate the FWHM of each source by fitting a 2D Gaussian to it. If ``False``, the 'fwhm_x', 'fwhm_y', and 'width' columns of output table will be filled with NaN values. sky_per_pix_avg : float or None, optional (default=None) Sky background to subtract before centroiding. If set to ``None``, it will be estimated using the mean of the sigma_clipped_stats of the image. padding : int, optional (default=0) Distance from the edge of the image to ignore when searching for sources. verbose : bool, optional If ``True``, print additional information about the source detection process. Returns ------- sources: `stellardev.SourceListData` A table of the positions of sources in the image. If `find_fwhm` is ``True``, includes columns for `fwhm_x`, `fwhm_y, and `width`. If the input CCDData object has WCS information, the columns `ra` and `dec` are also populated, otherwise they are set to NaN. """ # if image is a CCDData object, extract the data array if not isinstance(ccd, CCDData) and not isinstance(ccd, np.ndarray): raise ValueError("ccd must be a numpy array or CCDData object") # If user uses units for fwhm or sky_per_pix_avg, convert to value if isinstance(sky_per_pix_avg, u.Quantity): sky_per_pix_avg = sky_per_pix_avg.value if isinstance(fwhm, u.Quantity): fwhm = fwhm.value if verbose: print( "source_detection: You may see a warning about invalid values in the " "input image. This is expected if any pixels are saturated and can be " "ignored." ) # Get statistics of the input image (and use them to estimate the sky background # if not provided). Using clipped stats should hopefully get rid of any # bright stars that might be in the image. Only do this if stddev is not provided. if stddev is None: mean, median, stddev = sigma_clipped_stats(ccd, sigma=sigma, maxiters=iters) else: # Set median to None to indicate sigma clipped stats were not # calculated. median = None if sky_per_pix_avg is None: if median is not None: # We calculated sigma clipped stats, so use them sky_per_pix_avg = median else: # A median is a pretty good estimate of the sky background, so use it # for detection. For *photometry* a better estimate is needed, but for # detection, the median is good enough and much faster than sigma clipping. sky_per_pix_avg = np.nanmedian(ccd.data) if verbose: print(f"source_detection: sky_per_pix_avg set to {sky_per_pix_avg:.4f}") # Identify sources applying DAOStarFinder to a "sky subtracted" # image. if verbose: print( f"source_detection: threshold set to {threshold}* standard deviation " f"({stddev:.4f})" ) print(f"source_detection: Assuming fwhm of {fwhm} for DAOStarFinder") # daofind should be run on background subtracted image # (fails, or at least returns garbage, if sky_per_pix_avg is too low) daofind = DAOStarFinder(fwhm=fwhm, threshold=threshold * stddev) if isinstance(ccd, CCDData): sources = daofind(ccd.data - sky_per_pix_avg) else: sources = daofind(ccd - sky_per_pix_avg) # Identify sources near the edge of the image and remove them # from the source list. padding_smt = "" if padding > 0: src_cnt0 = len(sources) y_lim, x_lim = ccd.shape keep = ( (sources["xcentroid"].value >= padding) & (sources["ycentroid"].value >= padding) & (sources["xcentroid"].value < x_lim - padding) & (sources["ycentroid"].value < y_lim - padding) ) sources = sources[keep] padding_smt = f" (after removing {src_cnt0-len(sources)} sources near the edge)" src_cnt = len(sources) if verbose: print(f"source_detection: {src_cnt} sources identified{padding_smt}.") # If image as WCS, compute RA and Dec of each source try: # Retrieve the RA and Dec of each source as SKyCoord objects, then convert to # arrays of floats to add to table skypos = ccd.wcs.pixel_to_world(sources["xcentroid"], sources["ycentroid"]) sources["ra"] = skypos.ra.value sources["dec"] = skypos.dec.value except AttributeError: # No WCS, so add empty columns sources["ra"] = np.nan * np.ones(src_cnt) sources["dec"] = np.nan * np.ones(src_cnt) # If requested, compute the FWHM of each source if find_fwhm: x, y = compute_fwhm( ccd, sources, fwhm_estimate=fwhm, x_column="xcentroid", y_column="ycentroid", sky_per_pix_avg=sky_per_pix_avg, ) sources["fwhm_x"] = x sources["fwhm_y"] = y sources["width"] = (x + y) / 2 # Average of x and y FWHM # Flag bogus fwhm values returned from fitting (no objects # have a fwhm less than 1 pixel) bad_src = (sources["fwhm_x"] < 1) | (sources["fwhm_y"] < 1) sources["fwhm_x"][bad_src] = np.nan sources["fwhm_y"][bad_src] = np.nan sources["width"][bad_src] = np.nan else: # add empty columns sources["fwhm_x"] = np.nan * np.ones(src_cnt) sources["fwhm_y"] = np.nan * np.ones(src_cnt) sources["width"] = np.nan * np.ones(src_cnt) # Convert sources to SourceListData object by adding # units to the columns units_dict = { "id": None, "ra": u.deg, "dec": u.deg, "xcentroid": u.pix, "ycentroid": u.pix, "fwhm_x": u.pix, "fwhm_y": u.pix, "width": u.pix, } sources = Table(data=sources, units=units_dict) # Rename columns to match SourceListData colnamemap = {"id": "star_id", "xcentroid": "xcenter", "ycentroid": "ycenter"} sl_data = SourceListData(input_data=sources, colname_map=colnamemap) return sl_data
[docs] def fast_fwhm_from_image( ccd, fwhm_estimate, noise=10, n_brightest_sources=30, max_adu=40000, block_size=8, min_block_fwhm=1, aggregate_by="mean", ): """ Compute the FWHM of a CCD image by block reducing it, running source detection on the reduced image, and then computing the FWHM of the detected sources in the original image. Parameters ---------- ccd : `numpy.ndarray` or `astropy.nddata.CCDData` The CCD image array. fwhm_estimate : float The initial estimate for the FWHM of the sources in the image. noise : float, optional The estimated noise in the image. n_brightest_sources : int, optional The number of brightest sources to use for the FWHM estimate. max_adu : float, optional The maximum ADU value for a pixel in the image. Sources with peak fluxes greater than this value will be ignored. block_size : int, optional The size of the blocks to use for block reduction. The image will be divided into blocks of this size, and source detection will be done on the reduced image. min_block_fwhm : float, optional The minimum FWHM to allow after the block reduction. If the estimated FWHM is smaller than this value, the block size will be changed. aggregate_by : str, optional The method to use for aggregating the FWHM estimates. Can be 'mean' or 'median'. If None, the FWHM estimates will not be aggregated. Returns ------- fwhm : float or np.ndarray The FWHM of the sources in the image. If `aggregate_by` is None, this will be an array of FWHM values for each source. Otherwise, it will be a single value. Notes ----- The approach here is to detect sources in the image and measure their FWHM. To keep this reasonably fast, we block reduce the image and then run source detection on the reduced image. We then use the positions of the detected sources to estimate the FWHM of the sources in the original image. Only the brightest sources are used to estimate the FWHM, and sources that are too bright (i.e., have a peak flux greater than `max_adu`) are ignored. """ # Check whether the reduced fwhm is larger than the minimum and reset block_size # if it is too small. if fwhm_estimate / block_size < min_block_fwhm: block_size = int(fwhm_estimate / min_block_fwhm) # Get data and mask from CCDData object if isinstance(ccd, CCDData): data = ccd.data mask = ccd.mask else: data = ccd mask = None with warnings.catch_warnings(): # block_reduce generates some warnings about things like unit, wcs, etc # that are set on ccd but not preserved in the reduced image. # That is expected, so ignore it. warnings.filterwarnings( "ignore", message="The following attributes were set on the data object", category=AstropyUserWarning, ) reduced_data = block_reduce(data, block_size=block_size) # Pick a padding that is about 1% of the block reduced image size padding = int(0.01 * reduced_data.shape[0]) block_sources = source_detection( reduced_data, fwhm=fwhm_estimate / block_size, stddev=noise * block_size, # noise adds in quadrature sky_per_pix_avg=None, # make source_detection do the sky subtraction find_fwhm=False, threshold=20, padding=padding, ) # Reverse sort by peak flux block_sources.sort("peak", reverse=True) # To estimate whether the sources are too bright, divide the peak # flux by the area of the block. If this is greater than max_adu, # then the sources are too bright. too_bright = block_sources["peak"] / block_size**2 > max_adu # Drop the sources that are too bright block_sources = block_sources[~too_bright] # Only keep the n_brightest sources (eventually) n_brightest = min(len(block_sources), int(n_brightest_sources)) # Pad n_brightest a bit in case some of these end up being too bright fwhm_est_sources = block_sources[: int(1.2 * n_brightest)] # Estimate the x and y positions of the sources in the original image fwhm_est_sources["xcenter"] = block_size * (fwhm_est_sources["xcenter"].value + 0.5) fwhm_est_sources["ycenter"] = block_size * (fwhm_est_sources["ycenter"].value + 0.5) # Compute the FWHM of the sources in the original image # Make sure fit_shape is an odd number about 5 times the estimated fwhm fit_shape = int(2 * ((5 * fwhm_estimate) // 2) + 1) # This next bit is a little sneaky. We mask any values in the image larger than # max_adu. After we do the PSF fitting, we only keep results in which there were # no masked pixels in the fit. As a result, stars that are too bright in the image # will be ignored in the event they made it through the block reduction. mask = np.zeros_like(ccd.data, dtype=bool) if mask is None else mask mask |= data > max_adu with warnings.catch_warnings(): # fit_2dgaussian generates some warnings if some of the fits do not converge. # This probably does not actually affect real images, but it affects tests. # We suppress the warnings here and check the fit results for flags that # indicate a bad fit. warnings.filterwarnings( "ignore", message=r"One or more fit\(s\) may not have converged. Please check the", category=AstropyUserWarning, ) # This is faster than calling our own compute_fwhm, so do this. fit = fit_2dgaussian( data - np.nanmedian(data), xypos=list( zip( fwhm_est_sources["xcenter"], fwhm_est_sources["ycenter"], strict=True, ) ), fwhm=fwhm_estimate, fix_fwhm=False, fit_shape=fit_shape, mask=mask, ) results = fit.results # Only keep good fits -- this is where saturated pixels get removed, # because they were masked in the input image and flags=1 means there # were masked pixels in the fit. results = results[results["flags"] == 0] # Estimates of the FWHM that are larger than the fit_shape are bad, so drop # any of those results = results[results["fwhm_fit"] < fit_shape] # Only keep the desired number of sources fwhm = results["fwhm_fit"][:n_brightest] if aggregate_by is not None: match aggregate_by: case "mean": fwhm = np.mean(fwhm) case "median": fwhm = np.median(fwhm) case _: raise ValueError(f"Unknown aggregate_by method: {aggregate_by}") return fwhm