Source code for corrections.cbv_corrector.cbv

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
Cotrending Basis Vectors.

.. codeauthor:: Mikkel N. Lund <>
.. codeauthor:: Rasmus Handberg <>

import numpy as np
import os
import logging
import h5py
from import fits
from astropy.time import Time
import datetime
from bottleneck import nansum, nanmedian, allnan
from scipy.optimize import minimize, fmin_powell
from scipy.stats import norm, gaussian_kde
import functools
from ..utilities import loadPickle, fix_fits_table_headers
from ..quality import CorrectorQualityFlags
from .cbv_utilities import MAD_model, MAD_model2

def cbv_snr_test(cbv_ini, threshold_snrtest=5.0):
	logger = logging.getLogger(__name__)

#	A_signal = MAD_model(cbv_ini, axis=0)
#	A_noise = MAD_model(np.diff(cbv_ini, axis=0), axis=0)

	A_signal = np.nanstd(cbv_ini, axis=0)
	A_noise = np.nanstd(np.diff(cbv_ini, axis=0), axis=0)

	snr = 10 * np.log10( A_signal**2 / A_noise**2 )"SNR threshold used %s", threshold_snrtest)"SNR (dB) of CBVs %s", snr)

	indx_lowsnr = (snr < threshold_snrtest)

	# Never throw away first CBV
	indx_lowsnr[0] = False

	return indx_lowsnr

[docs]class CBV(object): """ Cotrending Basis Vector object. Attributes: sector (int): TESS Sector. cadence (int): TESS observing cadence in seconds. camera (int): TESS Camera (1-4). ccd (int): TESS CCD (1-4). cbv_area (int): data_rel (int): TESS Data release number. version (int): TASOC version/data release number. filepath (str): Path to file where CVB is stored. time (:class:`numpy.ndarray`): cadenceno (:class:`numpy.ndarray`): cbv (:class:`numpy.ndarray`): cbv_s (:class:`numpy.ndarray`): inifit (:class:`numpy.ndarray`): priors: .. codeauthor:: Mikkel N. Lund <> .. codeauthor:: Rasmus Handberg <> """ #----------------------------------------------------------------------------------------------
[docs] def __init__(self, filepath): logger = logging.getLogger(__name__) if not os.path.isfile(filepath): raise FileNotFoundError(f"Could not find CBV file: {filepath}") self.filepath = os.path.abspath(filepath) data_folder = os.path.abspath(os.path.dirname(filepath)) self.inifit = None with h5py.File(filepath, 'r') as hdf: self.sector = int(hdf.attrs['sector']) self.cadence = int(hdf.attrs['cadence']) self.datasource = str(hdf.attrs.get('datasource', '')) = int(hdf.attrs['camera']) self.ccd = int(hdf.attrs['ccd']) self.cbv_area = int(hdf.attrs['cbv_area']) self.data_rel = int(hdf.attrs.get('data_rel', -1)) self.ncomponents = int(hdf.attrs['Ncbvs']) self.threshold_correlation = float(hdf.attrs['threshold_correlation']) self.threshold_variability = float(hdf.attrs['threshold_variability']) self.threshold_snrtest = float(hdf.attrs['threshold_snrtest']) self.threshold_entropy = float(hdf.attrs['threshold_entropy']) self.version = str(hdf.attrs['version']) self.time = np.asarray(hdf['time']) self.cadenceno = np.asarray(hdf['cadenceno']) self.cbv = np.asarray(hdf['cbv-single-scale']) self.cbv_s = np.asarray(hdf['cbv-spike']) if 'inifit' in hdf: self.inifit = np.asarray(hdf['inifit']) # Catch for backwards capability for old CBV files: if self.datasource == '': self.datasource = 'ffi' if self.cadence == 1800 else 'tpf' # Warn about missing DATA_REL headers: if self.data_rel <= 0: logger.warning("The DATA_REL header is not available in the HDF5 file.") # Signal-to-Noise test (without actually removing any CBVs): indx_lowsnr = cbv_snr_test(self.cbv, self.threshold_snrtest) self.remove_cols(indx_lowsnr) self.priors = None priorpath = os.path.join(data_folder, f'cbv-prior-s{self.sector:04d}-c{self.cadence:04d}-a{self.cbv_area:d}.pickle') if os.path.isfile(priorpath): self.priors = loadPickle(priorpath) else:'Path to prior distance file does not exist: %s', priorpath)
[docs] def remove_cols(self, indx_lowsnr): self.cbv = self.cbv[:, ~indx_lowsnr] self.cbv_s = self.cbv_s[:, ~indx_lowsnr]
[docs] def lsfit(self, lc, Ncbvs): """ Computes the weighted least-squares solution to a linear matrix equation. Parameters: lc (:class:`LightCurve`): Lightcurve to fit. Ncbvs (int): Number of CBVs to include in fit. Returns: ndarray: Coefficients for CBV plus constant offset. """ # Make sure to remove points where CBV or FLUX is not defined: idx = np.isfinite(self.cbv[:,0]) & np.isfinite(lc.flux) & np.isfinite(lc.flux_err) # Build matrix to solve: X = np.column_stack((self.cbv[idx,:Ncbvs], self.cbv_s[idx,:Ncbvs], np.ones(np.sum(idx)))) F = lc.flux[idx] # Use the flux uncertainties as weights, by scaling the matrix and vector: # # X = X * np.abs(1/lc.flux_err[idx, np.newaxis]) F = F * np.abs(1/lc.flux_err[idx]) # Try to fit with fast pseudo-inverse method: try: return (np.linalg.pinv( except np.linalg.LinAlgError: pass except ValueError: logger = logging.getLogger(__name__) logger.exception("Error calculating pseudo-inverse solution.") # If the above method fails, try # another (but slower) implementation: try: return np.linalg.lstsq(X, F, rcond=None)[0] except np.linalg.LinAlgError: pass except ValueError: logger = logging.getLogger(__name__) logger.exception("Error calculating least-squares solution.") # If everything else fails, try doing a full (slow) non-linear optimization: # 2*N+1 because we need coefficients for both CBVs, Spike-CBVs and constant offset: logger = logging.getLogger(__name__) logger.warning("Linear optimization failed. Trying non-linear optimize as last resort.") coeff0 = np.zeros(2*Ncbvs+1, dtype='float64') coeff0[-1] = nanmedian(lc.flux) res = minimize(self.negloglike, coeff0, args=(lc,), method='Powell') if res.success: return res.x raise ValueError("Minimization was not successful: " + res.message)
[docs] def lsfit_spike(self, lc, Ncbvs): """ Computes the least-squares solution to a linear matrix equation. """ idx = np.isfinite(self.cbv_s[:,0]) & np.isfinite(lc.flux) A0 = self.cbv_s[idx,:Ncbvs] X = np.column_stack((A0, np.ones(A0.shape[0]))) F = lc.flux[idx] return (np.linalg.inv(
[docs] def mdl(self, coeffs): """ Model lightcurve given CBV coefficients. Parameters: coeffs (ndarray): CBV coefficients and constant offset. Should be of length 2*N+1, where the first N coefficients are for the CBVs, the next N are for the Spike-CBVs, and the last element is a constant offset. Returns: ndarray: Model lightcurve, given the CBV coefficients provided. Will be in relative flux around 1. """ coeffs = np.atleast_1d(coeffs) Ncbvs = int((len(coeffs)-1)/2) # Build the model # Start with "ones" since we are working in relative flux around 1 m = np.ones(self.cbv.shape[0], dtype='float64') for k in range(Ncbvs): m += coeffs[k] * self.cbv[:, k] # CBV m += coeffs[k+Ncbvs] * self.cbv_s[:, k] # Spike-CBV return m + coeffs[-1]
[docs] def mdl_spike(self, coeffs): coeffs = np.atleast_1d(coeffs) m = np.ones(self.cbv.shape[0], dtype='float64') for k in range(len(coeffs)-1): m += coeffs[k] * self.cbv_s[:, k] return m + coeffs[-1]
[docs] def mdl_off(self, coeff, fitted, Ncbvs): fitted = np.atleast_1d(fitted) # Start with ones as the flux is median normalised m = np.ones(self.cbv.shape[0], dtype='float64') for k in range(Ncbvs): m += fitted[k] * self.cbv[:, k] m += fitted[k+Ncbvs] * self.cbv_s[:, k] return m + coeff
[docs] def mdl1d(self, coeff, ncbv): cbv_comb = np.column_stack((self.cbv, self.cbv_s)) m = 1 + coeff * cbv_comb[:, ncbv] return m
[docs] @np.errstate(invalid='ignore') def negloglike(self, coeffs, lc): """ Negative log-likelihood function. Parameters: coeffs (ndarray): CBV coefficients. lc (:class:`LightCurve`): Lightcurve to be fitted. Should be in relative flux around 1. Returns: float: The negative log-likelihood of the coefficients, given the lightcurve. """ return -1 * nansum(norm.logpdf(lc.flux, self.mdl(coeffs), lc.flux_err))
#---------------------------------------------------------------------------------------------- @np.errstate(invalid='ignore') def _lhood_off(self, coeffs, lc, fitted, Ncbvs): return -1 * nansum(norm.logpdf(lc.flux, self.mdl_off(coeffs, fitted, Ncbvs), lc.flux_err)) #---------------------------------------------------------------------------------------------- # def _lhood_off_2(self, coeffs, flux, err, fitted): # return 0.5*nansum(((flux - self.mdl_off(coeffs, fitted))/err)**2) + 0.5*np.log(err**2) #---------------------------------------------------------------------------------------------- @np.errstate(invalid='ignore') def _lhood1d(self, coeff, lc, ncbv): return -1 * nansum(norm.logpdf(lc.flux, self.mdl1d(coeff, ncbv), lc.flux_err)) #---------------------------------------------------------------------------------------------- # def _lhood1d_2(self, coeff, flux, err, ncbv): # return 0.5*nansum(((flux - self.mdl1d(coeff, ncbv))/err)**2) + 0.5*np.log(err**2) #---------------------------------------------------------------------------------------------- def _posterior1d(self, coeff, flux, ncbv, pos, wscale, KDE): return self._lhood1d(coeff, flux, ncbv) - wscale*np.log(self._prior1d(coeff, KDE)) #---------------------------------------------------------------------------------------------- # def _posterior1d_2(self, coeff, flux, err, ncbv, pos, wscale, KDE): # Post = self._lhood1d_2(coeff, flux, err, ncbv) + self._prior1d(coeff, wscale, KDE) # return Post #---------------------------------------------------------------------------------------------- def _priorcurve(self, x, Ncbvs, N_neigh): """ Get "most likely" correction from peak in prior distributions of fitting coefficients .. codeauthor:: Mikkel N. Lund <> """ res = np.zeros_like(self.cbv[:, 0], dtype='float64') opts = np.zeros(int(Ncbvs*2)) # entries for both CBV and CBV_S no_cbv_coeff = self.cbv.shape[1] tree = self.priors dist, ind = tree.query(np.array([x]), k=N_neigh+1) W = 1/dist[0][1::]**2 for ncbv in range(Ncbvs): V = self.inifit[ind,1+ncbv][0][1::] VS = self.inifit[ind,1+ncbv + no_cbv_coeff][0][1::] KDE = gaussian_kde(V, weights=W.flatten(), bw_method='scott') KDES = gaussian_kde(VS, weights=W.flatten(), bw_method='scott') def kernel_opt(x): return -1*KDE.logpdf(x) opt = fmin_powell(kernel_opt, 0, disp=0) opts[ncbv] = opt def kernel_opts(x): return -1*KDES.logpdf(x) opt_s = fmin_powell(kernel_opts, 0, disp=0) opts[ncbv + Ncbvs] = opt_s res += (self.mdl1d(opt, ncbv) - 1) + (self.mdl1d(opt_s, ncbv + no_cbv_coeff) - 1) return res+1, opts #---------------------------------------------------------------------------------------------- def _prior1d(self, c, KDE=None): if KDE is None: return 0 else: return float(KDE(c)) #----------------------------------------------------------------------------------------------
[docs] def fitting_lh(self, lc, Ncbvs, start_guess=None): res = self.lsfit(lc, Ncbvs) res[-1] -= 1 return res
[docs] def fitting_lh_spike(self, lc, Ncbvs, start_guess=None): res = self.lsfit_spike(lc, Ncbvs) res[-1] -= 1 return res
[docs] def fitting_pos_2(self, lc, Ncbvs, err, pos, wscale, N_neigh, logprior=None, start_guess=None): # Initial guesses for coefficients: if start_guess is not None: start_guess = np.append(start_guess, 0) else: start_guess = np.zeros(2*Ncbvs+1, dtype='float64') res = np.zeros(int(Ncbvs*2), dtype='float64') tree = self.priors dist, ind = tree.query(np.array([pos]), k=N_neigh+1) W = 1/dist[0][1::]**2 # Define posterior function to be minimized: def neglogposterior(coeff): return self.negloglike(coeff, lc) - logprior(coeff) # # # # TEST with independent spike fit # ff = pchip_interpolate(np.arange(len(flux))[np.isfinite(flux)], flux[np.isfinite(flux)], np.arange(len(flux))) # F = sig.medfilt(ff, 15) # flux2 = np.copy(flux)/F # res_s = self.fitting_lh_spike(flux2, Ncbvs) # res[Ncbvs:] = res_s[:-1] # spike_filt = self.mdl_spike(res_s) # ## plt.figure() ## plt.plot(flux2) ## plt.plot(spike_filt-0.5) # # # flux1 = F*(flux2 - spike_filt + 1) # plt.figure() # plt.plot(flux, 'b') # plt.plot(F-0.1, 'k') # plt.plot(flux1+0.1, 'r') # plt.plot(flux2 - spike_filt + 1.5, 'g') # # # # sys.exit() for jj in range(Ncbvs): V = self.inifit[ind,1+jj][0][1::] KDE = gaussian_kde(V, weights=W.flatten(), bw_method='scott') # VS = self.inifit[ind,1+jj + no_cbv_coeff][0][1::] # KDES = gaussian_kde(VS, weights=W.flatten(), bw_method='scott') # res[jj] = minimize(self._posterior1d_2, coeffs0[jj], args=(flux, err, jj, pos, wscale, KDE), method='Powell').x res[jj] = minimize(self._posterior1d, start_guess[jj], args=(lc, jj, pos, wscale, KDE), method='Powell').x # Using KDE prior: # res[jj + Ncbvs] = minimize(self._posterior1d, coeffs0[jj + Ncbvs], args=(flux, jj + no_cbv_coeff, pos, wscale, KDES), method=method).x # Using flat prior # res[jj + Ncbvs] = minimize(self._posterior1d, coeffs0[jj + Ncbvs], args=(flux1, jj + no_cbv_coeff, pos, wscale, None), method=method).x # offset = minimize(self._lhood_off_2, coeffs0[-1], args=(flux, err, res), method='Powell').x offset = minimize(self._lhood_off, start_guess[-1], args=(lc, res, Ncbvs), method='Powell').x res = np.append(res, offset) return res
#---------------------------------------------------------------------------------------------- def _fit(self, lc, err=None, Numcbvs=None, sigma_clip=4.0, maxiter=50, use_bic=True, logprior=None, start_guess=None): """ Will do scaling of lightcurve to relative flux and perform an iterative fit using sigma-clipping of outliers. Parameters: lc (:class:`LightCurve`): Lightcurve to be fitted. Numcbvs (int, optional): Maximum number of CBVs to use in fit. If ``None`` (Default), all CBVs are considered. If ``use_bic`` is False, this is the number of CBVs used. use_bic (bool, optional): Use the Bayesian Information Criterion to select the best number of CBVs to use. Default=True. sigma_clip (float, optional): Sigma-clipping limit around model to ignore points. Default=4.0. maxiter (int, optional): Maximum number of iterations to do for sigma-clipping. A warning is issued if this is reached. Default=50. Returns: tuple: - ndarray: Flux of the final CBV model. - ndarray: CBV coefficients. .. codeauthor:: Rasmus Handberg <> """ logger = logging.getLogger(__name__) # If no number of CBVs are specified, # use the full set of CBVs: if Numcbvs is None: Numcbvs = self.cbv.shape[1] # Function to use for fitting. if logprior: fitfunc = functools.partial(self.fitting_pos_2, err=err, logprior=logprior) else: fitfunc = self.fitting_lh # Find the median flux to normalise light curve median_flux = nanmedian(lc.flux) # Figure out how many CBVs to loop over: if use_bic: # Prepare variables for storing results: bic = np.array([]) solutions = [] # Test a range of CBVs from 1 to Numcbvs # Fitting at least 1 CBV and an offset Nstart = 1 else: # Test only fit with Numcbvs Nstart = Numcbvs # Loop over the different number of CBVs to attempt: for Ncbvs in range(Nstart, Numcbvs+1): lci = lc.copy() / median_flux for iters in range(maxiter): # Do the fit: res = fitfunc(lci, Ncbvs, start_guess=start_guess) # Break if nothing changes if iters == 0: res0 = res else: d = np.sum(res0 - res) res0 = res if d == 0: break flux_filter = self.mdl(res) # Do robust sigma clipping: absdev = np.abs(lci.flux - flux_filter) mad = MAD_model(absdev) indx = np.greater(absdev, sigma_clip*mad, where=np.isfinite(absdev)) if np.any(indx): lci.flux[indx] = np.nan lci.flux_err[indx] = np.nan else: break else: logger.warning("Reached maximum number of iterations in CBV fit") if use_bic: # Calculate the Bayesian Information Criterion (BIC) and store the solution: mybic = len(res)*np.log(np.sum(np.isfinite(lci.flux))) + 2*self.negloglike(res, lci) # TODO: lc or lci?!?! bic = np.append(bic, mybic) solutions.append(res) if use_bic:'bic %s', bic)'iters %s', iters+1) # Use the solution which minimizes the BIC: indx = np.argmin(bic)'optimum number of CBVs %s', indx+1) res_final = solutions[indx] flux_filter = self.mdl(res_final) * median_flux else: res_final = res flux_filter = self.mdl(res_final) * median_flux return flux_filter, res_final #----------------------------------------------------------------------------------------------
[docs] def fit(self, lc, use_bic=True, use_prior=False, cbvs=None, alpha=1.3, WS_lim=0.5, N_neigh=1000): """ Fit the CBV object to a lightcurve, and return the fitted cotrending-lightcurve and the fitting coefficients. Parameters: lc (:class:`LightCurve`): Lightcurve to be cotrended. use_bic (bool, optional): Use the Bayesian Information Criterion to find the optimal number of CBVs to fit. Default=True. use_prior (bool, optional): cbvs (int, optional): Number of CBVs to fit to lightcurve. If `use_bic=True`, this indicated the maximum number of CBVs to fit. Returns: - `numpy.array`: Fitted lightcurve with the same length as `lc`. - list: Coefficients for each CBV. - dict: Diagnostics information about the fitting. """ logger = logging.getLogger(__name__) # If no uncertainties are provided, fill it with ones: if allnan(lc.flux_err): lc.flux_err[:] = 1 # Remove bad data based on quality if not allnan(lc.quality): flag_good = CorrectorQualityFlags.filter(lc.quality) lc.flux[~flag_good] = np.nan lc.flux_err[~flag_good] = np.nan # Diagnostics to return at the end about what was # actually used in the fitting: diagnostics = { 'method': None, 'use_bic': use_bic, 'use_prior': use_prior } # Fit the CBV to the flux: if use_prior: # Do fits including prior information from the initial fits # allow switching to a simple LSSQ fit depending on # variability measures (not fully implemented yet!) # Position of target in multidimentional prior space: row = lc.meta['task']['pos_row'] col = lc.meta['task']['pos_column'] tmag = np.clip(lc.meta['task']['tmag'], 2, 20) pos = np.array([row, col, tmag]) # Prior curve n_components =[1] pc0, opts = self._priorcurve(pos, n_components, N_neigh) pc = pc0 * lc.meta['task']['mean_flux'] # Compute new variability measure idx = np.isfinite(lc.flux) polyfit = np.polyval(np.polyfit(lc.time[idx], lc.flux[idx], 3), lc.time) residual = MAD_model(lc.flux - pc) #residual_ratio = MAD_model(lc.flux-lc.meta['task']['mean_flux'])/residual #WS = np.min([1, residual_ratio]) AA = 2 GRAW = np.std((pc - polyfit) / MAD_model2(lc.flux - polyfit) - 1) GPR = 0 + (1 - (GRAW/AA)**2) * (GRAW < AA) beta1 = 1 beta2 = 1 VAR = np.nanstd(lc.flux - polyfit) WS = np.min([1, (VAR**beta1)*(GPR**beta2)]) if WS > WS_lim: logger.debug('Fitting using LLSQ') flux_filter, res = self._fit(lc, Numcbvs=5, use_bic=use_bic) # use smaller number of CBVs diagnostics['method'] = 'LS' diagnostics['use_prior'] = False diagnostics['use_bic'] = False else: logger.debug('Fitting using Priors') # Define multi-dimentional prior: dist, ind = self.priors.query(pos, k=N_neigh+1) W = 1/dist[0][1:]**2 V = self.inifit[ind[1:], :] KDE = gaussian_kde(V, weights=W.flatten(), bw_method='scott') wscale = 1.0 def logprior(coeff): return wscale * KDE.logpdf(coeff) flux_filter, res = self._fit(lc, err=residual, use_bic=use_bic, logprior=logprior, start_guess=opts) diagnostics.update({ 'method': 'MAP', 'residual': residual, 'WS': WS, 'pc': pc }) else: # Do "simple" LSSQ fits using BIC to decide on number of CBVs to include logger.debug('Fitting TIC %d using LLSQ', lc.targetid) flux_filter, res = self._fit(lc, Numcbvs=cbvs, use_bic=use_bic) diagnostics['method'] = 'LS' return flux_filter, res, diagnostics
[docs] def save_to_fits(self, output_folder, version=6): """ Save CBVs to FITS file. Parameters: output_folder (str): Path to directory where FITS file should be saved. version (int): Data release number to add to file header. Returns: str: Path to the generated FITS file. Raises: FileNotFoundError: If `output_folder` is invalid. .. codeauthor:: Rasmus Handberg <> .. codeauthor:: Nicholas Saunders <> """ # Checks of input: if not os.path.isdir(output_folder): raise FileNotFoundError("Invalid output directory") # Store the date that the file is created now = # Timestamps of start and end of timeseries: tdel = self.cadence/86400 tstart = self.time[0] - tdel/2 tstop = self.time[-1] + tdel/2 tstart_tm = Time(tstart, 2457000, format='jd', scale='tdb') tstop_tm = Time(tstop, 2457000, format='jd', scale='tdb') telapse = tstop - tstart # write fits header information hdr = fits.Header() hdr['EXTNAME'] = ('PRIMARY', 'extension name') hdr['ORIGIN'] = ('TASOC/Aarhus', 'institution responsible for creating this file') hdr['DATE'] = (now.strftime("%Y-%m-%d"), 'file creation date') hdr['TELESCOP'] = ('TESS', 'telescope') hdr['INSTRUME'] = ('TESS Photometer', 'detector type') hdr['SECTOR'] = (self.sector, 'Observing sector') hdr['DATA_REL'] = (self.data_rel, 'data release version number') hdr['VERSION'] = (version, 'TASOC data release version number') hdr['CADENCE'] = (self.cadence, '[s] observation cadence') hdr['PROCVER'] = (self.version, 'Version of corrections pipeline') hdr['FILEVER'] = ('1.0', 'File format version') hdr['TSTART'] = (tstart, 'observation start time in TJD') hdr['TSTOP'] = (tstop, 'observation stop time in TJD') hdr['DATE-OBS'] = (tstart_tm.utc.isot, 'TSTART as UTC calendar date') hdr['DATE-END'] = (tstop_tm.utc.isot, 'TSTOP as UTC calendar date') hdr['CAMERA'] = (, 'CCD camera') hdr['CCD'] = (self.ccd, 'CCD chip') hdr['CBV_AREA'] = (self.cbv_area, 'CCD area') # Create primary hdu: phdu = fits.PrimaryHDU(header=hdr) # Create common table headers: table_header = fits.Header() table_header['TIMEREF'] = ('SOLARSYSTEM', 'barycentric correction applied to times') table_header['TIMESYS'] = ('TDB', 'time system is Barycentric Dynamical Time (TDB)') table_header['JDREFI'] = (2457000, 'integer part of BTJD reference date') table_header['JDREFF'] = (0.0, 'fraction of the day in BTJD reference date') table_header['TIMEUNIT'] = ('d', 'time unit for TIME, TSTART and TSTOP') table_header['TSTART'] = (tstart, 'observation start time in TJD') table_header['TSTOP'] = (tstop, 'observation stop time in TJD') table_header['DATE-OBS'] = (tstart_tm.utc.isot, 'TSTART as UTC calendar date') table_header['DATE-END'] = (tstop_tm.utc.isot, 'TSTOP as UTC calendar date') table_header['TELAPSE'] = (telapse, '[d] LC_STOP - LC_START') table_header['TIMEPIXR'] = (0.5, 'bin time beginning=0 middle=0.5 end=1') table_header['TIMEDEL'] = (tdel, '[d] time resolution of data') table_header['CAMERA'] = (, 'CCD camera') table_header['CCD'] = (self.ccd, 'CCD chip') table_header['CBV_AREA'] = (self.cbv_area, 'CCD area') # Settings used when generating the CBV: table_header['THR_COR'] = (self.threshold_correlation, 'Fraction of stars used for CBVs') table_header['THR_VAR'] = (self.threshold_variability, 'Threshold for variability rejection') table_header['THR_SNR'] = (self.threshold_snrtest, 'Threshold for SNR test') table_header['THR_ENT'] = (self.threshold_entropy, 'Threshold for entropy cleaning') # Columns that are common between all tables: col_time = fits.Column(name='TIME', format='D', unit='JD - 2457000, days', disp='D17.7', array=self.time) col_cadno = fits.Column(name='CADENCENO', format='J', disp='I10', array=self.cadenceno) # Single-scale CBVs: cols = [col_time, col_cadno] col_titles = {} # store all CBVs for each camera and chip for n in range(self.cbv.shape[1]): col_name = 'VECTOR_%d' % (n+1) col = fits.Column(name=col_name, format='E', disp='F8.5', array=self.cbv[:, n]) cols.append(col) col_titles[col_name] = 'column title: co-trending basis vector %d' % (n+1) # append CBVs as hdu columns table_hdu1 = fits.BinTableHDU.from_columns(cols, header=table_header, name=f'CBV.SINGLE-SCALE.{self.cbv_area:d}') # Fix table headers: fix_fits_table_headers(table_hdu1, column_titles=col_titles) table_hdu1.header.comments['TTYPE1'] = 'column title: data time stamps' table_hdu1.header.comments['TUNIT1'] = 'column units: TESS modified Julian date (TJD)' table_hdu1.header.comments['TTYPE2'] = 'column title: unique cadence number' # Spike CBVs: cols = [col_time, col_cadno] col_titles = {} # store all CBVs for each camera and chip for n in range(self.cbv_s.shape[1]): col_name = 'VECTOR_%d' % (n+1) col = fits.Column(name=col_name, format='E', disp='F8.5', array=self.cbv_s[:, n]) col_titles[col_name] = 'column title: co-trending basis vector %d' % (n+1) cols.append(col) # append CBVs as hdu columns table_hdu2 = fits.BinTableHDU.from_columns(cols, header=table_header, name=f'CBV.SPIKE.{self.cbv_area:d}') # Fix table headers: fix_fits_table_headers(table_hdu2, column_titles=col_titles) table_hdu2.header.comments['TTYPE1'] = 'column title: data time stamps' table_hdu2.header.comments['TUNIT1'] = 'column units: TESS modified Julian date (TJD)' table_hdu2.header.comments['TTYPE2'] = 'column title: unique cadence number' # Name of the fname = f'tess-s{self.sector:04d}-c{self.cadence:04d}-a{self.cbv_area:d}-v{version:d}-tasoc_cbv.fits.gz' filepath = os.path.join(output_folder, fname) # store as HDU list and write to fits file with fits.HDUList([phdu, table_hdu1, table_hdu2]) as hdul: hdul.writeto(filepath, overwrite=True, checksum=True) return filepath