Source code for photometry.AperturePhotometry.photometry

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Simple Aperture Photometry using K2P2 to define masks.

.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""

import numpy as np
from bottleneck import allnan
import logging
from .. import BasePhotometry, STATUS
from ..utilities import mag2flux
from . import k2p2v2 as k2p2

#--------------------------------------------------------------------------------------------------
[docs] class AperturePhotometry(BasePhotometry): """ Simple Aperture Photometry using K2P2 to define masks. .. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk> """ #----------------------------------------------------------------------------------------------
[docs] def __init__(self, *args, **kwargs): # Call the parent initializing: # This will set several default settings super().__init__(*args, **kwargs)
#---------------------------------------------------------------------------------------------- def _minimum_aperture(self): # Map of valid pixels that can be included: collected_pixels = (self.aperture & 1 != 0) # Create minimum 2x2 mask around target position: cols, rows = self.get_pixel_grid() mask_main = (( np.abs(cols - self.target_pos_column - 1) <= 1 ) & ( np.abs(rows - self.target_pos_row - 1) <= 1 )) # Return the 2x2 mask, but only the pixels that are actually collected: return mask_main & collected_pixels #----------------------------------------------------------------------------------------------
[docs] def do_photometry(self): """Perform photometry on the given target. This function needs to set * self.lightcurve """ logger = logging.getLogger(__name__) logger.info("Running aperture photometry...") k2p2_settings = { 'thresh': 0.8, 'min_no_pixels_in_mask': 4, 'min_for_cluster': 4, 'cluster_radius': np.sqrt(2) + np.finfo(np.float64).eps, 'segmentation': True, 'ws_blur': 0.5, 'ws_thres': 0, 'ws_footprint': 3, 'extend_overflow': True } # For bright saturated stars we allow for more retries: ExpectedFlux = mag2flux(self.target['tmag']) haloswitch_tmag_limit = self.settings.getfloat('haloswitch', 'tmag_limit') haloswitch_flux_limit = self.settings.getfloat('haloswitch', 'flux_limit') allow_retries = 5 if self.target['tmag'] < 6: allow_retries = 10 for retries in range(allow_retries): # Delete any plots left over in the plots folder from an earlier iteration: self.delete_plots() # Create the sum-image: SumImage = self.sumimage logger.info(self.stamp) logger.info("Target position in stamp: (%f, %f)", self.target_pos_row_stamp, self.target_pos_column_stamp ) cat = np.column_stack(( self.catalog['column_stamp'], self.catalog['row_stamp'], self.catalog['tmag'])) logger.info("Creating new masks...") try: masks, background_bandwidth = k2p2.k2p2FixFromSum(SumImage, plot_folder=self.plot_folder, show_plot=False, catalog=cat, **k2p2_settings) masks = np.asarray(masks, dtype='bool') except k2p2.K2P2NoStars: logger.error('No flux above threshold.') masks = np.asarray(0, dtype='bool') using_minimum_mask = False if len(masks.shape) == 0: logger.warning("No masks found. Using minimum aperture.") mask_main = self._minimum_aperture() using_minimum_mask = True else: # Look at the central pixel where the target should be: indx_main = masks[:, int(round(self.target_pos_row_stamp)), int(round(self.target_pos_column_stamp))].flatten() if not np.any(indx_main): logger.warning('No mask found for main target. Using minimum aperture.') mask_main = self._minimum_aperture() using_minimum_mask = True elif np.sum(indx_main) > 1: logger.error('Too many masks.') return STATUS.ERROR else: # Mask of the main target: mask_main = masks[indx_main, :, :].reshape(SumImage.shape) # Find out if we are touching any of the edges: resize_args = {} if np.any(mask_main[0, :]): resize_args['down'] = 10 if np.any(mask_main[-1, :]): resize_args['up'] = 10 if np.any(mask_main[:, 0]): resize_args['left'] = 10 if np.any(mask_main[:, -1]): resize_args['right'] = 10 if resize_args: logger.info("Touching the edges! Retrying.") logger.debug(resize_args) stamp_before = self.stamp sumimage_before = self.sumimage if not self.resize_stamp(**resize_args): resize_args = {} logger.warning("Could not resize stamp any further.") break # It did resize, but let's just check if it tried # to resize in a direction, but it hit the limit. # In that case, let's check if we are already over the "HaloSwitch" limit # Don't do this for secondary targets though. if self.target['tmag'] <= haloswitch_tmag_limit and not self.datasource.startswith('tpf:'): edge = np.zeros_like(mask_main, dtype='bool') if resize_args.get('down') and self.stamp[0] == stamp_before[0]: edge[0, :] = True if resize_args.get('up') and self.stamp[1] == stamp_before[1]: edge[-1, :] = True if resize_args.get('left') and self.stamp[2] == stamp_before[2]: edge[:, 0] = True if resize_args.get('right') and self.stamp[3] == stamp_before[3]: edge[:, -1] = True if np.any(edge): EdgeFlux = np.nansum(sumimage_before[mask_main & edge]) if EdgeFlux/ExpectedFlux > haloswitch_flux_limit: logger.error('Stamp resize hit limit. Haloswitch quick break.') self._details['edge_flux'] = EdgeFlux return STATUS.ERROR else: break # If we reached the last retry but still needed a resize, give up: if resize_args: logger.error('Too many stamp resizes.') return STATUS.ERROR # XY of pixels in frame cols, rows = self.get_pixel_grid() members = np.column_stack((cols[mask_main], rows[mask_main])) # Loop through the images and backgrounds together: for k, (img, imgerr, bck) in enumerate(zip(self.images, self.images_err, self.backgrounds)): flux_in_cluster = img[mask_main] # Calculate flux in mask: if allnan(flux_in_cluster) or np.all(flux_in_cluster == 0): self.lightcurve['flux'][k] = np.NaN self.lightcurve['flux_err'][k] = np.NaN self.lightcurve['pos_centroid'][k, :] = np.NaN #self.lightcurve['quality'][k] |= ? else: self.lightcurve['flux'][k] = np.sum(flux_in_cluster) self.lightcurve['flux_err'][k] = np.sqrt(np.sum(imgerr[mask_main]**2)) # Calculate flux centroid: finite_vals = (flux_in_cluster > 0) if np.any(finite_vals): self.lightcurve['pos_centroid'][k, :] = np.average(members[finite_vals], weights=flux_in_cluster[finite_vals], axis=0) else: self.lightcurve['pos_centroid'][k, :] = np.NaN if allnan(bck[mask_main]): self.lightcurve['flux_background'][k] = np.NaN else: self.lightcurve['flux_background'][k] = np.nansum(bck[mask_main]) # Save the mask to be stored in the outout file: self.final_phot_mask = mask_main self.final_position_mask = mask_main # Add additional headers specific to this method: #self.additional_headers['KP_SUBKG'] = (bool(subtract_background), 'K2P2 subtract background?') self.additional_headers['KP_THRES'] = (k2p2_settings['thresh'], 'K2P2 sum-image threshold') self.additional_headers['KP_MIPIX'] = (k2p2_settings['min_no_pixels_in_mask'], 'K2P2 min pixels in mask') self.additional_headers['KP_MICLS'] = (k2p2_settings['min_for_cluster'], 'K2P2 min pix. for cluster') self.additional_headers['KP_CLSRA'] = (k2p2_settings['cluster_radius'], 'K2P2 cluster radius') self.additional_headers['KP_WS'] = (bool(k2p2_settings['segmentation']), 'K2P2 watershed segmentation') #self.additional_headers['KP_WSALG'] = (k2p2_settings['ws_alg'], 'K2P2 watershed weighting') self.additional_headers['KP_WSBLR'] = (k2p2_settings['ws_blur'], 'K2P2 watershed blur') self.additional_headers['KP_WSTHR'] = (k2p2_settings['ws_thres'], 'K2P2 watershed threshold') self.additional_headers['KP_WSFOT'] = (k2p2_settings['ws_footprint'], 'K2P2 watershed footprint') self.additional_headers['KP_EX'] = (bool(k2p2_settings['extend_overflow']), 'K2P2 extend overflow') # Targets that are in the mask: target_in_mask = [k for k,t in enumerate(self.catalog) if np.any(mask_main & (rows == np.round(t['row'])+1) & (cols == np.round(t['column'])+1))] # Figure out which status to report back: my_status = STATUS.OK # Calculate contamination from the other targets in the mask: if len(target_in_mask) == 0: logger.error("No targets in mask.") contamination = np.nan my_status = STATUS.ERROR elif len(target_in_mask) == 1 and self.catalog[target_in_mask][0]['starid'] == self.starid: contamination = 0 else: # Calculate contamination metric as defined in Lund & Handberg (2014): mags_in_mask = self.catalog[target_in_mask]['tmag'] mags_total = -2.5*np.log10(np.nansum(10**(-0.4*mags_in_mask))) contamination = 1.0 - 10**(0.4*(mags_total - self.target['tmag'])) contamination = np.clip(contamination, 0, None) # Avoid stupid signs due to round-off errors logger.info("Contamination: %f", contamination) if not np.isnan(contamination): self.additional_headers['AP_CONT'] = (contamination, 'AP contamination') # Check if there are other targets in the mask that could then be skipped from # processing, and report this back to the TaskManager. The TaskManager will decide # if this means that this target or the other targets should be skipped in the end. skip_targets = [t['starid'] for t in self.catalog[target_in_mask] if t['starid'] != self.starid] if skip_targets: logger.info("These stars could be skipped: %s", skip_targets) self.report_details(skip_targets=skip_targets) # Figure out which status to report back: if using_minimum_mask: my_status = STATUS.WARNING # Return whether you think it went well: return my_status