# Source code for photometry.linpsf_photometry

```#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Linear PSF Photometry.

Do point spread function photometry with fixed centroids. The flux of
all stars in the image are fitted simultaneously using a linear least
squares method.

.. codeauthor:: Jonas Svenstrup Hansen <jonas.svenstrup@gmail.com>
"""

import numpy as np
import logging
import os.path
from bottleneck import allnan, nansum
from . import BasePhotometry, STATUS
#from .utilities import mag2flux
from .plots import plt, plot_image_fit_residuals, save_figure, plot_outline

#--------------------------------------------------------------------------------------------------
def lsfit(A, b):
"""
Linear least squares fitting by solving Ax=b.
"""

# Try to fit with fast pseudo-inverse method:
try:
return (np.linalg.pinv(A.T.dot(A)).dot(A.T)).dot(b)
except np.linalg.LinAlgError:
pass

# Linear least squares:
return np.linalg.lstsq(A, b, rcond=None)

# Non-negative linear least squares:
#fluxes, rnorm = scipy.optimize.nnls(A, b)

#--------------------------------------------------------------------------------------------------

[docs]
class LinPSFPhotometry(BasePhotometry):

[docs]
def __init__(self, *args, **kwargs):
"""
Linear PSF photometry.

Do point spread function photometry with fixed centroids. The flux of
all stars in the image are fitted simultaneously using a linear least
squares method.

Note:
Inspired by the :py:class:`psf_photometry` class set up by
Rasmus Handberg <rasmush@phys.au.dk>. The code in this
:py:func:`__init__` function as well as the logging, catalog call,
time domain loop structure, catalog star limits and lightcurve
output is copied from that class.

.. codeauthor:: Jonas Svenstrup Hansen <jonas.svenstrup@gmail.com>
"""
# Call the parent initializing:
# This will set several default settings
super().__init__(*args, **kwargs)

#----------------------------------------------------------------------------------------------
def _minimum_aperture(self):
# Map of valid pixels that can be included:
collected_pixels = (self.aperture & 1 != 0)

# Create minimum 2x2 mask around target position:
cols, rows = self.get_pixel_grid()
mask_main = (( np.abs(cols - self.target_pos_column - 1) <= 1 )
& ( np.abs(rows - self.target_pos_row - 1) <= 1 ))

# Return the 2x2 mask, but only the pixels that are actually collected:

#----------------------------------------------------------------------------------------------

[docs]
def do_photometry(self):
"""Linear PSF Photometry
TODO: add description of method and what A and b are
"""

logger = logging.getLogger(__name__)

# Load catalog to determine what stars to fit:
cat = self.catalog
staridx = np.squeeze(np.where(cat['starid'] == self.starid))

# Log full catalog for current stamp:
logger.debug(cat)

# Calculate distance from main target:
cat['dist'] = np.sqrt((cat['row_stamp'][staridx] - cat['row_stamp'])**2
+ (cat['column_stamp'][staridx] - cat['column_stamp'])**2)

# Find indices of stars in catalog to fit:
# (only include stars that are close to the main target and that are
# not much fainter)
indx = (cat['dist'] < 5) & (cat['tmag'][staridx]-cat['tmag'] > -5)
nstars = int(np.sum(indx))

# Get target star index in the reduced catalog of stars to fit:
staridx = np.squeeze(np.where(cat[indx]['starid'] == self.starid))
logger.debug('Target star index: %s', np.str(staridx))

# Preallocate flux sum array for contamination calculation:
fluxes_sum = np.zeros(nstars, dtype='float64')

# Small aperture around target centre to do MOMF-style aperture correction on:
mini_aperture = self._minimum_aperture()

# Start looping through the images (time domain):
for k, img in enumerate(self.images):
# Get catalog at current time in MJD:
cat = self.catalog_attime(self.lightcurve['time'][k] - self.lightcurve['timecorr'][k])

# Reduce catalog to only include stars that should be fitted:
cat = cat[indx]
logger.debug(cat)

# Get the number of pixels in the image:
good_pixels = np.isfinite(img)
npx = int(np.sum(good_pixels))

# Create A, the 2D of vertically reshaped PRF 1D arrays:
A = np.empty([npx, nstars], dtype='float64')
for col, target in enumerate(cat):
# Get star parameters with flux set to 1 and reshape:
params0 = np.atleast_2d([target['row_stamp'], target['column_stamp'], 1.])

# Fill out column of A with reshaped PRF array from one star:

# Crate b, the solution array by reshaping the image to a 1D array:
b = img[good_pixels].flatten()

# Do linear least squares fit to solve Ax=b:
try:
fluxes = lsfit(A, b)
except np.linalg.LinAlgError:
logger.debug("Linear PSF Fitting failed")
fluxes = None

# Pass result if fit did not fail:
if fluxes is None:
logger.warning("We should flag that this has not gone well.")
self.lightcurve['flux'][k] = np.NaN
self.lightcurve['flux_err'][k] = np.NaN
self.lightcurve['quality'][k] |= 1 # FIXME: Use the real flag!

else:
# Get flux of target star:
target_flux = fluxes[staridx]
logger.debug('Target flux: %f', target_flux)

# Calculate final best fit image and residuals:
result = np.column_stack((cat['row_stamp'], cat['column_stamp'], fluxes))
residuals = img - best_fit

# Perform aperture photometry on the residuals:
flux_ap = nansum(residuals[mini_aperture])
logger.debug("Aperture correction: %f%%", 100*flux_ap/target_flux)
result += flux_ap

# Add the result of the main star to the lightcurve:
self.lightcurve['flux'][k] = target_flux
self.lightcurve['flux_err'][k] = np.NaN # FIXME: Add errors!

# Add current fitted fluxes for contamination calculation:
fluxes_sum += fluxes

# Make plots for debugging:
if self.plot and logger.isEnabledFor(logging.DEBUG):
fig = plt.figure()

# Add subplots with the image, fit and residuals:
ax_list = plot_image_fit_residuals(
fig=fig,
image=img,
fit=best_fit,
residuals=residuals
)

# Add star position to the first plot:
ax_list.scatter(result[staridx], result[staridx], c='r', alpha=0.5)

# Plot outline of mini-aperture:
plot_outline(mini_aperture, ax=ax_list, color='k', lw=2)

# Save figure to file:
fig_name = f'tess_{self.starid:011d}_linpsf_{k:05d}'
save_figure(os.path.join('.', fig_name), fig=fig)
plt.close(fig)

# Set contamination to NaN if all flux values are NaN:
if allnan(self.lightcurve['flux']):
self.report_details(error='All target flux values are NaN.')
return STATUS.ERROR

# Divide by number of added fluxes to get the mean flux:
fluxes_mean = fluxes_sum / np.sum(~np.isnan(self.lightcurve['flux']))
logger.debug('Mean fluxes are: %s', fluxes_mean)

# Calculate contamination from other stars in target PSF using latest A:
not_target_star = np.arange(len(fluxes_mean)) != staridx
contamination = np.sum(A[:,not_target_star].dot(fluxes_mean[not_target_star]) * A[:,staridx]) / fluxes_mean[staridx]

logger.info("Contamination: %f", contamination)