Source code for photometry.halo.halo_photometry

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Halo Photometry.

.. codeauthor:: Benjamin Pope <benjamin.pope@nyu.edu>
.. codeauthor:: Tim White <white@phys.au.dk>
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""

import logging
import os.path
import contextlib
import numpy as np
from astropy.table import Table
import halophot
from halophot.halo_tools import do_lc
from ..plots import matplotlib, plt, plot_image, save_figure
from .. import BasePhotometry, STATUS
from ..quality import TESSQualityFlags
from ..utilities import mag2flux, LoggerWriter

#--------------------------------------------------------------------------------------------------
[docs] class HaloPhotometry(BasePhotometry): """ Use halo photometry to observe very saturated stars. .. codeauthor:: Benjamin Pope <benjamin.pope@nyu.edu> .. codeauthor:: Tim White <white@phys.au.dk> .. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk> """
[docs] def __init__(self, *args, **kwargs): # Call the parent initializing: # This will set several default settings super().__init__(*args, **kwargs)
#----------------------------------------------------------------------------------------------
[docs] def do_photometry(self): """ Performs 'halo' TV-min weighted-aperture photometry. Parameters ---------- aperture_mask : array-like, 'pipeline', or 'all' A boolean array describing the aperture such that `False` means that the pixel will be masked out. If the string 'all' is passed, all pixels will be used. The default behavior is to use the Kepler pipeline mask. splits : tuple, (None, None) or (2152,2175) etc. A tuple including two times at which to split the light curve and run halo separately outside these splits. sub : int Do you want to subsample every nth pixel in your light curve? Not advised, but can come in handy for very large TPFs. maxiter: int Number of iterations to optimize. 101 is default & usually sufficient. w_init: None or array-like. Initialize weights with a particular weight vector - useful if you have already run TV-min and want to update, but otherwise set to None and it will have default initialization. random_init: Boolean If False, and w_init is None, it will initialize with uniform weights; if True, it will initialize with random weights. False is usually better. thresh: float A float greater than 0. Pixels less than this fraction of the maximum flux at any pixel will be masked out - this is to deal with saturation. Because halo is usually intended for saturated stars, the default is 0.8, to deal with saturated pixels. If your star is not saturated, set this greater than 1.0. objective: string Objective function: can be tv, tv_o2, l2v, or l3v. analytic: Boolean If True, it will optimize the TV with autograd analytic derivatives, which is several orders of magnitude faster than with numerical derivatives. This is by default True but you can run it numerically with False if you prefer. sigclip: Boolean If True, it will iteratively run the TV-min algorithm clipping outliers. Use this for data with a lot of outliers, but by default it is set False. """ # Start logger to use for print logger = logging.getLogger(__name__) logger.info("starid: %d", self.starid) # Halophot settings: splits = (None, None) sub = 1 maxiter = 101 w_init = None random_init = False thresh = -1 minflux = -100.0 objective = 'tv' analytic = True sigclip = False dist_max = 20.0 # In the case of FFI, set the size of the postage stamp to be just slightly larger than # the maximum distance from the target (allowing for one pixel on each side): if self.datasource == 'ffi': self.resize_stamp(width=dist_max+2, height=dist_max+2) logger.info("Target position in stamp: (%f, %f)", self.target_pos_row_stamp, self.target_pos_column_stamp) # Initialize logger.info('Formatting data for halo') indx_goodtimes = np.isfinite(self.lightcurve['time']) time = self.lightcurve['time'][indx_goodtimes] flux = self.images_cube.T[indx_goodtimes, :, :] # Cut out pixels closer than 20 pixels, that were actually observed: # TODO: We should maybe use self.resize_stamp instead, at least for FFIs. # TODO: Should the limit scale with Tmag? # TODO: Is there a one pixel offset in dist? cols, rows = self.get_pixel_grid() dist = np.sqrt((cols - self.target_pos_column)**2 + (rows - self.target_pos_row)**2) pixel_mask = (self.aperture & 1 != 0) & (dist <= dist_max) # Cut out the pixel mask in flux cube: flux[:, ~pixel_mask.T] = np.NaN # Find timestamps where the timeseries should be split: if self.sector == 1: split_times = (1339., 1347.366, 1349.315) elif self.sector == 2: split_times = (1368.,) elif self.sector == 3: split_times = (1395.52,) elif self.sector == 8: split_times = (1529.50,) else: # If nothing has been explicitely defined for this sector, # let's see of there is a large gap somewhere near the middle of # timeseries, that is properly due to the data downlink. # If a single hole is found, use it, otherwise don't try to be # to clever, and just don't split the timeseries. timecorr = self.lightcurve['timecorr'][indx_goodtimes] t = time - timecorr dt = np.append(np.diff(t), 0) t0 = np.nanmin(t) Ttot = np.nanmax(t) - t0 indx = (t0+0.30*Ttot < t) & (t < t0+0.70*Ttot) & (dt > 0.5) if np.sum(indx) == 1: indx = np.where(indx)[0][0] thole = 0.5*(t[indx] + t[indx+1]) + timecorr[indx] logger.info("Automatically found split: %f", thole) split_times = (thole,) else: logger.warning("No split-timestamps have been defined for this sector") split_times = None # TODO: Is this correct? # Ensure that we are not splitting outside the provided time: if split_times is not None: split_times = tuple([st for st in split_times if np.min(time) < st < np.max(time)]) if not split_times: split_times = None logger.debug("Split times: %s", split_times) # Get the position of the main target col = self.target_pos_column + self.lightcurve['pos_corr'][:, 0] row = self.target_pos_row + self.lightcurve['pos_corr'][:, 1] # Put together timeseries table in the format that halophot likes: ts = Table({ 'time': time, 'cadence': self.lightcurve['cadenceno'][indx_goodtimes], 'x': col[indx_goodtimes], 'y': row[indx_goodtimes], 'quality': self.lightcurve['quality'][indx_goodtimes] }) # Run the halo photometry core function try: # Redirect stdout to logger.info with contextlib.redirect_stdout(LoggerWriter(logger)): pf, ts, weights, weightmap_dict, pixels_sub = do_lc( flux, ts, splits, sub, maxiter=maxiter, split_times=split_times, w_init=w_init, random_init=random_init, thresh=thresh, minflux=minflux, objective=objective, analytic=analytic, sigclip=sigclip, verbose=logger.isEnabledFor(logging.INFO), mission='TESS', bitmask=TESSQualityFlags.DEFAULT_BITMASK ) except: # noqa: E722 logger.exception('Halo optimization failed') return STATUS.ERROR # Fix for halophot sometimes not returning lists: for key, value in weightmap_dict.items(): if not isinstance(value, list): weightmap_dict[key] = [value] # Rescale the extracted flux: normfactor = mag2flux(self.target['tmag']) self.lightcurve['flux'][indx_goodtimes] = ts['corr_flux'] * normfactor # Create mapping from each cadence to which weightmap was used: wmindx = np.zeros_like(indx_goodtimes, dtype=int) for k, (cad1, cad2) in enumerate(zip(weightmap_dict['initial_cadence'], weightmap_dict['final_cadence'])): wmindx[(self.lightcurve['cadenceno'] >= cad1) & (self.lightcurve['cadenceno'] <= cad2)] = k # Calculate the flux error by uncertainty propergation: for k, imgerr in enumerate(self.images_err): if not indx_goodtimes[k]: continue wm = weightmap_dict['weightmap'][wmindx[k]] # Get the weightmap for this cadence self.lightcurve['flux_err'][k] = np.abs(normfactor) * np.sqrt(np.nansum( wm**2 * imgerr**2 )) self.lightcurve['pos_centroid'][:,0] = col # we don't actually calculate centroids self.lightcurve['pos_centroid'][:,1] = row # Save the weightmap into special property which will make sure # that it is saved into the final FITS output files: self.halo_weightmap = weightmap_dict # Plotting: if self.plot: logger.info('Plotting weight map') for k, wm in enumerate(weightmap_dict['weightmap']): # Create normalization for plot: wm[~pixel_mask] = np.NaN vlim = np.nanmax(np.abs(wm)) norm = matplotlib.colors.SymLogNorm(linthresh=0.1*vlim, vmin=-vlim, vmax=vlim, base=10) # Create plot: fig, ax = plt.subplots() plot_image(wm, ax=ax, scale=norm, title='TV-min Weightmap', cmap='bwr', cbar='right', clabel='Weights') save_figure(os.path.join(self.plot_folder, f'{self.starid:d}_weightmap_{k+1:d}'), fig=fig) plt.close(fig) # Add additional headers specific to this method: self.additional_headers['HALO_VER'] = (halophot.__version__, 'Version of halophot') self.additional_headers['HALO_OBJ'] = (objective, 'Halophot objective function') self.additional_headers['HALO_THR'] = (thresh, 'Halophot saturated pixel threshold') self.additional_headers['HALO_MXI'] = (maxiter, 'Halophot maximum optimisation iterations') self.additional_headers['HALO_SCL'] = (sigclip, 'Halophot sigma clipping enabled') self.additional_headers['HALO_MFL'] = (minflux, 'Halophot minimum flux') # Return mask used for photometry: self.final_phot_mask = pixel_mask # Check if there are other targets in the mask that could then be skipped from # processing, and report this back to the TaskManager. The TaskManager will decide # if this means that this target or the other targets should be skipped in the end. cols, rows = self.get_pixel_grid() skip_targets = [t['starid'] for t in self.catalog if t['starid'] != self.starid and np.any(pixel_mask & (rows == np.round(t['row'])+1) & (cols == np.round(t['column'])+1))] if skip_targets: logger.info("These stars could be skipped: %s", skip_targets) self.report_details(skip_targets=skip_targets) # Return whether you think it went well: return STATUS.OK