Source code for photometry.psf_photometry

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
PSF Photometry.

.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""

import os.path
import logging
import numpy as np
from bottleneck import nansum
#from copy import deepcopy
from scipy.optimize import minimize
from . import BasePhotometry, STATUS
from .utilities import mag2flux
from .plots import plt, plot_image_fit_residuals, save_figure, plot_outline

[docs] class PSFPhotometry(BasePhotometry):
[docs] def __init__(self, *args, **kwargs): # Call the parent initializing: # This will set several default settings super().__init__(*args, **kwargs) self.cutoff_radius = 5
#---------------------------------------------------------------------------------------------- def _minimum_aperture(self): # Map of valid pixels that can be included: collected_pixels = (self.aperture & 1 != 0) # Create minimum 2x2 mask around target position: cols, rows = self.get_pixel_grid() mask_main = (( np.abs(cols - self.target_pos_column - 1) <= 1 ) & ( np.abs(rows - self.target_pos_row - 1) <= 1 )) # Return the 2x2 mask, but only the pixels that are actually collected: return mask_main & collected_pixels #---------------------------------------------------------------------------------------------- def _logprior(self, params): # Reshape the parameters into a 2D array: params = params.reshape(len(params)//3, 3) fluxes = params[:, 2] # Fluxes are not allowed to be negative: if np.any(fluxes < 0): return -np.inf return 0 #---------------------------------------------------------------------------------------------- def _lhood(self, params, img, bkg, lhood_stat='Gaussian_d', include_bkg=True): """ Log-likelihood function to be minimized for the PSF fit. Parameters: params (numpy array): Parameters for the PSF integrator. img_bkg (list): List containing the image and background numpy arrays. lhood_stat (string): Determines what statistic to use. Default is ``Gaussian_d``. Can also be ``Gaussian_m`` or ``Poisson``. include_bkg (boolean): Determine whether to include background. Default is ``True``. """ # Reshape the parameters into a 2D array: params = params.reshape(len(params)//3, 3) # Define minimum weights to avoid dividing by 0: minweight = 1e-9 minvar = 1e-9 # Pass the list of stars to the PSF integrator to produce an artificial image: mdl = self.psf.integrate_to_image(params, cutoff_radius=self.cutoff_radius) # Calculate the likelihood value: if lhood_stat.startswith('Gaussian'): if lhood_stat == 'Gaussian_d': if include_bkg: var = np.abs(img + bkg) # can be outside _lhood else: var = np.abs(img) # can be outside _lhood elif lhood_stat == 'Gaussian_m': if include_bkg: var = np.abs(mdl + bkg) # has to be in _lhood else: var = np.abs(mdl) # has to be in _lhood # Add 2nd term of Erwin (2015), eq. (13): var += self.n_readout * self.readnoise**2 / self.gain**2 var[var < minvar] = minvar weightmap = 1 / var weightmap[weightmap < minweight] = minweight # Return the chi2: return nansum( weightmap * (img - mdl)**2 ) elif lhood_stat == 'Poisson': # Prepare model for logarithmic expression by changing zeros to small values: mdl_for_log = mdl mdl_for_log[mdl_for_log < 1e-9] = 1e-9 # Return the Cash statistic: return 2 * nansum( mdl - img * np.log(mdl_for_log) ) elif lhood_stat == 'old_Gaussian': # Return the chi2: return nansum( (img - mdl)**2 / img ) else: raise ValueError("Invalid statistic: '%s'" % lhood_stat) #----------------------------------------------------------------------------------------------
[docs] def do_photometry(self): """PSF Photometry""" logger = logging.getLogger(__name__) # Generate list of stars to fit: cat = self.catalog # Calculate distance from main target: cat['dist'] = np.sqrt((self.target_pos_row_stamp - cat['row_stamp'])**2 + (self.target_pos_column_stamp - cat['column_stamp'])**2) # Only include stars that are close to the main target and that are not much fainter: cat = cat[(cat['dist'] < 5) & (self.target['tmag']-cat['tmag'] > -5)] # Sort the catalog by distance and include at max the five closest stars: # FIXME: Make sure that the main target is in there!!! cat.sort('dist') if len(cat) > 5: cat = cat[:5] # Because the minimize routine used below only likes 1D numpy arrays # we have to restructure the catalog: params0 = np.empty((len(cat), 3), dtype='float64') for k, target in enumerate(cat): params0[k,:] = [target['row_stamp'], target['column_stamp'], mag2flux(target['tmag'])] #params_start = deepcopy(params0) # Save the starting parameters for later params0 = params0.flatten() # Make the parameters into a 1D array # Small aperture around target centre to do MOMF-style aperture correction on: mini_aperture = self._minimum_aperture() # Start looping through the images (time domain): for k, (img, bkg) in enumerate(zip(self.images, self.backgrounds)): # Print timestep index to logger: logger.debug('Current timestep: %d', k) # Set the maximum number of iterations for the minimize routine: if k > 0: maxiter = 500 else: # The first step requires more iterations due to bad starting guess maxiter = 1500 # Run the fitting routine for this image: res = minimize(self._lhood, params0, args=(img, bkg), method='Nelder-Mead', options={'maxiter': maxiter}) logger.debug(res) if res.success: result = res.x result = np.array(result.reshape(len(result)//3, 3)) logger.debug(result) target_flux = result[0, 2] # Calculate final best fit image and residuals: best_fit = self.psf.integrate_to_image(result, cutoff_radius=self.cutoff_radius) residuals = img - best_fit # Perform aperture photometry on the residuals: flux_ap = nansum(residuals[mini_aperture]) logger.debug("Aperture correction: %f%%", 100*flux_ap/target_flux) target_flux += flux_ap # Add the result of the main star to the lightcurve: self.lightcurve['flux'][k] = target_flux self.lightcurve['flux_err'][k] = np.NaN # FIXME: Add errors! self.lightcurve['pos_centroid'][k] = result[0, 0:2] # TODO: use debug figure toggle to decide if to plot and export if self.plot and logger.isEnabledFor(logging.DEBUG): fig = plt.figure() ax_list = plot_image_fit_residuals(fig, img, best_fit, residuals) ax_list[0].scatter(result[:,1], result[:,0], c='r', alpha=0.5) plot_outline(mini_aperture, ax=ax_list[2], color='k', lw=2) fig_name = f'psf_photometry_s{self.sector:02d}_{self.starid:011d}_{k:05d}' save_figure(os.path.join(self.plot_folder, fig_name), fig=fig) plt.close(fig) # In the next iteration, start from the current solution: params0 = res.x else: logger.warning("We should flag that this has not gone well.") self.lightcurve['flux'][k] = np.NaN self.lightcurve['flux_err'][k] = np.NaN self.lightcurve['pos_centroid'][k] = [np.NaN, np.NaN] #self.lightcurve['quality'][k] |= 1 # FIXME: Use the real flag! # Return whether you think it went well: return STATUS.OK