Source code for corrections.cbv_corrector.CBVCorrector

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Correct lightcurves using Cotrending Basis Vectors.

.. codeauthor:: Mikkel N. Lund <mikkelnl@phys.au.dk>
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""

import numpy as np
import os
import logging
from astropy.io import fits
from bottleneck import nanmedian, nanstd
#from scipy.stats import norm
from ..plots import plt
from .. import BaseCorrector, STATUS
from .cbv import CBV

#--------------------------------------------------------------------------------------------------
[docs] class CBVCorrector(BaseCorrector): """ The CBV (Co-trending Basis Vectors) correction method for the TASOC photometry pipeline. The CBVCorrector inherits functionality of :py:class:`BaseCorrector`. Attributes: cbvs (dict): Dictionary of ``CBV`` objects. .. codeauthor:: Mikkel N. Lund <mikkelnl@phys.au.dk> .. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk> """
[docs] def __init__(self, *args, **kwargs): """ Initialise the CBVCorrector. .. codeauthor:: Mikkel N. Lund <mikkelnl@phys.au.dk> .. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk> """ # Call the parent initializing: # This will set several default settings super().__init__(*args, **kwargs) # Dictionary that will hold CBV objects: self.cbvs = {}
#----------------------------------------------------------------------------------------------
[docs] def do_correction(self, lc, use_prior=False): """ Function where the correction is called, and where additional headers for the FITS are defined Parameters: lc (`TessLightCurve`): Lightcurve to correct. use_prior (boolean, optional): Use prior in fitting of CBVs. Default=False. Returns: tuple: - `TessLightcurve`: Corrected lightcurve. - `STATUS`: Status of the correction. .. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk> .. codeauthor:: Mikkel N. Lund <mikkelnl@phys.au.dk> """ logger = logging.getLogger(__name__) logger.info('Co-trending star with TIC ID: %d', lc.targetid) sector = lc.meta['task']['sector'] cadence = lc.meta['task']['cadence'] cbv_area = lc.meta['task']['cbv_area'] datasource = lc.meta['task']['datasource'][0:3] # Load the CBV (and Prior) from memory and if it is not already loaded, # load it in from file and keep it in memory for next time: cbv_key = (sector, cadence, cbv_area) cbv = self.cbvs.get(cbv_key) if cbv is None: logger.debug("Loading CBV for area %d into memory", cbv_area) cbv = CBV(os.path.join(self.data_folder, f'cbv-s{sector:04d}-c{cadence:04d}-a{cbv_area:d}.hdf5')) self.cbvs[cbv_key] = cbv if use_prior and cbv.priors is None: raise IOError('Trying to co-trend without a defined prior') # Update maximum number of components n_components = cbv.cbv.shape[1] logger.info('Fitting using number of components: %d', n_components) # Special treatment when not having a CBV created for this cadence: # Use the fit of the FFI as a "prior" and do a weighted fit, where # the FFI coefficients and high-cadence coefficients are combined. # TODO: This is still experimental! if use_prior and datasource == 'tpf': # Find the corrected lightcurve for the same target, observed in FFI: star_ffi = self.search_database( select='diagnostics_corr.lightcurve,corr_status', join='LEFT JOIN diagnostics_corr ON diagnostics_corr.priority=todolist.priority', search=['todolist.starid=%d' % lc.targetid, "datasource='ffi'"], limit=1)[0] print(star_ffi) if star_ffi['corr_status'] is None: raise ValueError("Star has not been processed with FFI data yet") #if star_ffi['corr_status'] == STATUS.WARNING: # status = STATUS.WARNING #if star_ffi['corr_status'] not in (STATUS.OK, STATUS.WARNING) or star_ffi['lightcurve'] is None: # logger.warning() # status = STATUS.WARNING # alpha = 0 # Load CBV coefficients fitted to the FFI data: ffi_path = os.path.join(self.input_folder, star_ffi['lightcurve']) with fits.open(ffi_path, mode='readonly', memmap=True) as hdu: Ncbvs_ffi = hdu[1].header.get('CBV_NUM', hdu[1].header['CBV_COMP']) coeff_ffi = np.array([hdu[1].header['CBV_C%d' % (k+1)] for k in range(Ncbvs_ffi)]) offset_ffi = hdu[1].header['CBV_C0'] median_flux = nanmedian(lc.flux) flux_filter_ffi = cbv.mdl(np.append(coeff_ffi, offset_ffi)) * median_flux # Fit the interpolated CBV to the high cadence data using LS: flux_filter_interp, coeff_interp, diagnostics = cbv.fit(lc, cbvs=Ncbvs_ffi, use_bic=False, use_prior=False) # Separate coefficients and offset = coeff_interp[-1] coeff_interp = coeff_interp[:Ncbvs_ffi] logger.debug("OFFSET = %f", offset) # Weighting function of FFI vs SC: # For alpha=0, the fit to high-cadence data is weighted up. # For alpha=1, the fit to FFI is weighted high. #alpha = 1 - max(1, lc.meta['task']['variability'])**-2 #alpha = min(1, max( 2*lc.meta['task']['rms_hour']/lc.meta['task']['ptp'] - 1, 0)) flux = lc.flux / lc.meta['task']['mean_flux'] indx = np.isfinite(flux) p = np.polyfit(lc.time[indx], flux[indx], 3) fpol = flux - np.polyval(p, lc.time) ptp = nanmedian(np.abs(np.diff(fpol))) variability = nanstd(fpol) / ptp alpha = 1 - 1/max(1, variability)**2 #alpha = norm.cdf(variability, 2.5, 0.5) logger.debug("ALPHA = %f", alpha) res = (1 - alpha)*coeff_interp + alpha*coeff_ffi # The constant offset is determined solely from the high cadence fit: res = np.append(res, offset) # Final weighted filter-lightcurve: flux_filter = cbv.mdl(res) * median_flux # Set that the diagnostics['method'] = 'WLS' if self.plot: fig = plt.figure() ax = fig.add_subplot(111) ax.scatter(lc.time, lc.flux, c='k', alpha=0.3, s=2, label='Raw lightcurve') ax.plot(lc.time, flux_filter_ffi, label='FFI fit') ax.plot(lc.time, flux_filter_interp, label='SC fit') ax.plot(lc.time, flux_filter, label='Final fit') ax.set_xlabel('Time (TBJD)') ax.set_ylabel('Flux (%s)' % lc.flux_unit) ax.legend() ax.set_title(f'TIC {lc.targetid:d} - $\\alpha = {alpha:.3f}$') filename = 'tess%011d-cbv_corr-tpf.png' % lc.targetid fig.savefig(os.path.join(self.plot_folder(lc), filename)) plt.close(fig) else: # Run the standard fitting, using BIC, and optionally also using prior: flux_filter, res, diagnostics = cbv.fit(lc, use_bic=True, use_prior=use_prior) # Corrected light curve in ppm lc_corr = 1e6*(lc.copy()/flux_filter - 1) # Make sure all NaN fluxes have corresponding NaN errors: lc_corr.flux_err[np.isnan(lc_corr.flux)] = np.NaN # Defining FITS headers: no_cbvs_fitted = int((len(res)-1)/2) res = np.array([res,]).flatten() lc_corr.meta['additional_headers']['CBV_AREA'] = (cbv_area, 'CBV area of star') lc_corr.meta['additional_headers']['CBV_MET'] = (diagnostics['method'], 'Method used to fit CBVs') lc_corr.meta['additional_headers']['CBV_BIC'] = (diagnostics['use_bic'], 'Was BIC used to select no of CBVs') lc_corr.meta['additional_headers']['CBV_PRI'] = (diagnostics['use_prior'], 'Was prior used') lc_corr.meta['additional_headers']['CBV_MAX'] = (n_components, 'Number of possible CBVs to fit') lc_corr.meta['additional_headers']['CBV_NUM'] = (no_cbvs_fitted, 'Number of fitted CBVs') lc_corr.meta['additional_headers']['CBV_C0'] = (res[-1], 'Fitted offset') for ii in range(no_cbvs_fitted): lc_corr.meta['additional_headers']['CBV_C%d' % (ii+1)] = (res[ii], 'CBV%d coefficient' % (ii+1)) for jj in range(no_cbvs_fitted): lc_corr.meta['additional_headers']['CBVS_C%d' % (jj+1)] = (res[jj+no_cbvs_fitted], 'Spike-CBV%d coefficient' % (jj+1)) # Set the status of the correction: status = STATUS.OK if len(res) < 4: # fitting and using only one CBV status = STATUS.WARNING if len(res) > 21: # fitting and using more than 10 CBVs status = STATUS.WARNING if self.plot: fig = plt.figure() ax1 = fig.add_subplot(211) ax1.plot(lc.time, lc.flux) ax1.plot(lc.time, flux_filter) if 'pc' in diagnostics: ax1.plot(lc.time, diagnostics['pc'], 'm--') ax1.set_xlabel('Time (TBJD)') ax1.set_ylabel('Flux (counts)') ax1.set_xticks([]) ax2 = fig.add_subplot(212) ax2.plot(lc_corr.time, lc_corr.flux) ax2.set_xlabel('Time (TBJD)') ax2.set_ylabel('Relative flux (ppm)') plt.tight_layout() fig.savefig(os.path.join(self.plot_folder(lc), f'tess{lc.targetid:011d}-cbv_corr.png')) plt.close(fig) return lc_corr, status