#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Cotrending Basis Vectors.
.. codeauthor:: Mikkel N. Lund <mikkelnl@phys.au.dk>
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""
import numpy as np
import os
import logging
import h5py
from astropy.io import fits
from astropy.time import Time
import datetime
from bottleneck import nansum, nanmedian, allnan
from scipy.optimize import minimize, fmin_powell
from scipy.stats import norm, gaussian_kde
import functools
from ..utilities import loadPickle, fix_fits_table_headers
from ..quality import CorrectorQualityFlags
from .cbv_utilities import MAD_model, MAD_model2
#--------------------------------------------------------------------------------------------------
def cbv_snr_test(cbv_ini, threshold_snrtest=5.0):
logger = logging.getLogger(__name__)
# A_signal = MAD_model(cbv_ini, axis=0)
# A_noise = MAD_model(np.diff(cbv_ini, axis=0), axis=0)
A_signal = np.nanstd(cbv_ini, axis=0)
A_noise = np.nanstd(np.diff(cbv_ini, axis=0), axis=0)
snr = 10 * np.log10( A_signal**2 / A_noise**2 )
logger.info("SNR threshold used %s", threshold_snrtest)
logger.info("SNR (dB) of CBVs %s", snr)
indx_lowsnr = (snr < threshold_snrtest)
# Never throw away first CBV
indx_lowsnr[0] = False
return indx_lowsnr
#--------------------------------------------------------------------------------------------------
[docs]
class CBV(object):
"""
Cotrending Basis Vector object.
Attributes:
sector (int): TESS Sector.
cadence (int): TESS observing cadence in seconds.
camera (int): TESS Camera (1-4).
ccd (int): TESS CCD (1-4).
cbv_area (int):
data_rel (int): TESS Data release number.
version (int): TASOC version/data release number.
filepath (str): Path to file where CVB is stored.
time (:class:`numpy.ndarray`):
cadenceno (:class:`numpy.ndarray`):
cbv (:class:`numpy.ndarray`):
cbv_s (:class:`numpy.ndarray`):
inifit (:class:`numpy.ndarray`):
priors:
.. codeauthor:: Mikkel N. Lund <mikkelnl@phys.au.dk>
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""
#----------------------------------------------------------------------------------------------
[docs]
def __init__(self, filepath):
logger = logging.getLogger(__name__)
if not os.path.isfile(filepath):
raise FileNotFoundError(f"Could not find CBV file: {filepath}")
self.filepath = os.path.abspath(filepath)
data_folder = os.path.abspath(os.path.dirname(filepath))
self.inifit = None
with h5py.File(filepath, 'r') as hdf:
self.sector = int(hdf.attrs['sector'])
self.cadence = int(hdf.attrs['cadence'])
self.datasource = str(hdf.attrs.get('datasource', ''))
self.camera = int(hdf.attrs['camera'])
self.ccd = int(hdf.attrs['ccd'])
self.cbv_area = int(hdf.attrs['cbv_area'])
self.data_rel = int(hdf.attrs.get('data_rel', -1))
self.ncomponents = int(hdf.attrs['Ncbvs'])
self.threshold_correlation = float(hdf.attrs['threshold_correlation'])
self.threshold_variability = float(hdf.attrs['threshold_variability'])
self.threshold_snrtest = float(hdf.attrs['threshold_snrtest'])
self.threshold_entropy = float(hdf.attrs['threshold_entropy'])
self.version = str(hdf.attrs['version'])
self.time = np.asarray(hdf['time'])
self.cadenceno = np.asarray(hdf['cadenceno'])
self.cbv = np.asarray(hdf['cbv-single-scale'])
self.cbv_s = np.asarray(hdf['cbv-spike'])
if 'inifit' in hdf:
self.inifit = np.asarray(hdf['inifit'])
# Catch for backwards capability for old CBV files:
if self.datasource == '':
self.datasource = 'ffi' if self.cadence == 1800 else 'tpf'
# Warn about missing DATA_REL headers:
if self.data_rel <= 0:
logger.warning("The DATA_REL header is not available in the HDF5 file.")
# Signal-to-Noise test (without actually removing any CBVs):
indx_lowsnr = cbv_snr_test(self.cbv, self.threshold_snrtest)
self.remove_cols(indx_lowsnr)
self.priors = None
priorpath = os.path.join(data_folder, f'cbv-prior-s{self.sector:04d}-c{self.cadence:04d}-a{self.cbv_area:d}.pickle')
if os.path.isfile(priorpath):
self.priors = loadPickle(priorpath)
else:
logger.info('Path to prior distance file does not exist: %s', priorpath)
#----------------------------------------------------------------------------------------------
[docs]
def remove_cols(self, indx_lowsnr):
self.cbv = self.cbv[:, ~indx_lowsnr]
self.cbv_s = self.cbv_s[:, ~indx_lowsnr]
#----------------------------------------------------------------------------------------------
[docs]
def lsfit(self, lc, Ncbvs):
"""
Computes the weighted least-squares solution to a linear matrix equation.
Parameters:
lc (:class:`LightCurve`): Lightcurve to fit.
Ncbvs (int): Number of CBVs to include in fit.
Returns:
ndarray: Coefficients for CBV plus constant offset.
"""
# Make sure to remove points where CBV or FLUX is not defined:
idx = np.isfinite(self.cbv[:,0]) & np.isfinite(lc.flux) & np.isfinite(lc.flux_err)
# Build matrix to solve:
X = np.column_stack((self.cbv[idx,:Ncbvs], self.cbv_s[idx,:Ncbvs], np.ones(np.sum(idx))))
F = lc.flux[idx]
# Use the flux uncertainties as weights, by scaling the matrix and vector:
# https://en.wikipedia.org/wiki/Weighted_least_squares
# https://stackoverflow.com/questions/27128688/how-to-use-least-squares-with-weight-matrix
X = X * np.abs(1/lc.flux_err[idx, np.newaxis])
F = F * np.abs(1/lc.flux_err[idx])
# Try to fit with fast pseudo-inverse method:
try:
return (np.linalg.pinv(X.T.dot(X)).dot(X.T)).dot(F)
except np.linalg.LinAlgError:
pass
except ValueError:
logger = logging.getLogger(__name__)
logger.exception("Error calculating pseudo-inverse solution.")
# If the above method fails, try
# another (but slower) implementation:
try:
return np.linalg.lstsq(X, F, rcond=None)[0]
except np.linalg.LinAlgError:
pass
except ValueError:
logger = logging.getLogger(__name__)
logger.exception("Error calculating least-squares solution.")
# If everything else fails, try doing a full (slow) non-linear optimization:
# 2*N+1 because we need coefficients for both CBVs, Spike-CBVs and constant offset:
logger = logging.getLogger(__name__)
logger.warning("Linear optimization failed. Trying non-linear optimize as last resort.")
coeff0 = np.zeros(2*Ncbvs+1, dtype='float64')
coeff0[-1] = nanmedian(lc.flux)
res = minimize(self.negloglike, coeff0, args=(lc,), method='Powell')
if res.success:
return res.x
raise ValueError("Minimization was not successful: " + res.message)
#----------------------------------------------------------------------------------------------
[docs]
def lsfit_spike(self, lc, Ncbvs):
"""
Computes the least-squares solution to a linear matrix equation.
"""
idx = np.isfinite(self.cbv_s[:,0]) & np.isfinite(lc.flux)
A0 = self.cbv_s[idx,:Ncbvs]
X = np.column_stack((A0, np.ones(A0.shape[0])))
F = lc.flux[idx]
return (np.linalg.inv(X.T.dot(X)).dot(X.T)).dot(F)
#----------------------------------------------------------------------------------------------
[docs]
def mdl(self, coeffs):
"""
Model lightcurve given CBV coefficients.
Parameters:
coeffs (ndarray): CBV coefficients and constant offset.
Should be of length 2*N+1, where the first N coefficients are for the CBVs,
the next N are for the Spike-CBVs, and the last element is a constant offset.
Returns:
ndarray: Model lightcurve, given the CBV coefficients provided. Will be in relative
flux around 1.
"""
coeffs = np.atleast_1d(coeffs)
Ncbvs = int((len(coeffs)-1)/2)
# Build the model
# Start with "ones" since we are working in relative flux around 1
m = np.ones(self.cbv.shape[0], dtype='float64')
for k in range(Ncbvs):
m += coeffs[k] * self.cbv[:, k] # CBV
m += coeffs[k+Ncbvs] * self.cbv_s[:, k] # Spike-CBV
return m + coeffs[-1]
#----------------------------------------------------------------------------------------------
[docs]
def mdl_spike(self, coeffs):
coeffs = np.atleast_1d(coeffs)
m = np.ones(self.cbv.shape[0], dtype='float64')
for k in range(len(coeffs)-1):
m += coeffs[k] * self.cbv_s[:, k]
return m + coeffs[-1]
#----------------------------------------------------------------------------------------------
[docs]
def mdl_off(self, coeff, fitted, Ncbvs):
fitted = np.atleast_1d(fitted)
# Start with ones as the flux is median normalised
m = np.ones(self.cbv.shape[0], dtype='float64')
for k in range(Ncbvs):
m += fitted[k] * self.cbv[:, k]
m += fitted[k+Ncbvs] * self.cbv_s[:, k]
return m + coeff
#----------------------------------------------------------------------------------------------
[docs]
def mdl1d(self, coeff, ncbv):
cbv_comb = np.column_stack((self.cbv, self.cbv_s))
m = 1 + coeff * cbv_comb[:, ncbv]
return m
#----------------------------------------------------------------------------------------------
[docs]
@np.errstate(invalid='ignore')
def negloglike(self, coeffs, lc):
"""
Negative log-likelihood function.
Parameters:
coeffs (ndarray): CBV coefficients.
lc (:class:`LightCurve`): Lightcurve to be fitted. Should be in relative flux around 1.
Returns:
float: The negative log-likelihood of the coefficients, given the lightcurve.
"""
return -1 * nansum(norm.logpdf(lc.flux, self.mdl(coeffs), lc.flux_err))
#----------------------------------------------------------------------------------------------
@np.errstate(invalid='ignore')
def _lhood_off(self, coeffs, lc, fitted, Ncbvs):
return -1 * nansum(norm.logpdf(lc.flux, self.mdl_off(coeffs, fitted, Ncbvs), lc.flux_err))
#----------------------------------------------------------------------------------------------
# def _lhood_off_2(self, coeffs, flux, err, fitted):
# return 0.5*nansum(((flux - self.mdl_off(coeffs, fitted))/err)**2) + 0.5*np.log(err**2)
#----------------------------------------------------------------------------------------------
@np.errstate(invalid='ignore')
def _lhood1d(self, coeff, lc, ncbv):
return -1 * nansum(norm.logpdf(lc.flux, self.mdl1d(coeff, ncbv), lc.flux_err))
#----------------------------------------------------------------------------------------------
# def _lhood1d_2(self, coeff, flux, err, ncbv):
# return 0.5*nansum(((flux - self.mdl1d(coeff, ncbv))/err)**2) + 0.5*np.log(err**2)
#----------------------------------------------------------------------------------------------
def _posterior1d(self, coeff, flux, ncbv, pos, wscale, KDE):
return self._lhood1d(coeff, flux, ncbv) - wscale*np.log(self._prior1d(coeff, KDE))
#----------------------------------------------------------------------------------------------
# def _posterior1d_2(self, coeff, flux, err, ncbv, pos, wscale, KDE):
# Post = self._lhood1d_2(coeff, flux, err, ncbv) + self._prior1d(coeff, wscale, KDE)
# return Post
#----------------------------------------------------------------------------------------------
def _priorcurve(self, x, Ncbvs, N_neigh):
"""
Get "most likely" correction from peak in prior distributions
of fitting coefficients
.. codeauthor:: Mikkel N. Lund <mikkelnl@phys.au.dk>
"""
res = np.zeros_like(self.cbv[:, 0], dtype='float64')
opts = np.zeros(int(Ncbvs*2)) # entries for both CBV and CBV_S
no_cbv_coeff = self.cbv.shape[1]
tree = self.priors
dist, ind = tree.query(np.array([x]), k=N_neigh+1)
W = 1/dist[0][1::]**2
for ncbv in range(Ncbvs):
V = self.inifit[ind,1+ncbv][0][1::]
VS = self.inifit[ind,1+ncbv + no_cbv_coeff][0][1::]
KDE = gaussian_kde(V, weights=W.flatten(), bw_method='scott')
KDES = gaussian_kde(VS, weights=W.flatten(), bw_method='scott')
def kernel_opt(x):
return -1*KDE.logpdf(x)
opt = fmin_powell(kernel_opt, 0, disp=0)
opts[ncbv] = opt
def kernel_opts(x):
return -1*KDES.logpdf(x)
opt_s = fmin_powell(kernel_opts, 0, disp=0)
opts[ncbv + Ncbvs] = opt_s
res += (self.mdl1d(opt, ncbv) - 1) + (self.mdl1d(opt_s, ncbv + no_cbv_coeff) - 1)
return res+1, opts
#----------------------------------------------------------------------------------------------
def _prior1d(self, c, KDE=None):
if KDE is None:
return 0
else:
return float(KDE(c))
#----------------------------------------------------------------------------------------------
[docs]
def fitting_lh(self, lc, Ncbvs, start_guess=None):
res = self.lsfit(lc, Ncbvs)
res[-1] -= 1
return res
#----------------------------------------------------------------------------------------------
[docs]
def fitting_lh_spike(self, lc, Ncbvs, start_guess=None):
res = self.lsfit_spike(lc, Ncbvs)
res[-1] -= 1
return res
#----------------------------------------------------------------------------------------------
[docs]
def fitting_pos_2(self, lc, Ncbvs, err, pos, wscale, N_neigh, logprior=None, start_guess=None):
# Initial guesses for coefficients:
if start_guess is not None:
start_guess = np.append(start_guess, 0)
else:
start_guess = np.zeros(2*Ncbvs+1, dtype='float64')
res = np.zeros(int(Ncbvs*2), dtype='float64')
tree = self.priors
dist, ind = tree.query(np.array([pos]), k=N_neigh+1)
W = 1/dist[0][1::]**2
# Define posterior function to be minimized:
def neglogposterior(coeff):
return self.negloglike(coeff, lc) - logprior(coeff)
#
#
# # TEST with independent spike fit
# ff = pchip_interpolate(np.arange(len(flux))[np.isfinite(flux)], flux[np.isfinite(flux)], np.arange(len(flux)))
# F = sig.medfilt(ff, 15)
# flux2 = np.copy(flux)/F
# res_s = self.fitting_lh_spike(flux2, Ncbvs)
# res[Ncbvs:] = res_s[:-1]
# spike_filt = self.mdl_spike(res_s)
#
## plt.figure()
## plt.plot(flux2)
## plt.plot(spike_filt-0.5)
#
#
# flux1 = F*(flux2 - spike_filt + 1)
# plt.figure()
# plt.plot(flux, 'b')
# plt.plot(F-0.1, 'k')
# plt.plot(flux1+0.1, 'r')
# plt.plot(flux2 - spike_filt + 1.5, 'g')
#
#
# plt.show()
# sys.exit()
for jj in range(Ncbvs):
V = self.inifit[ind,1+jj][0][1::]
KDE = gaussian_kde(V, weights=W.flatten(), bw_method='scott')
# VS = self.inifit[ind,1+jj + no_cbv_coeff][0][1::]
# KDES = gaussian_kde(VS, weights=W.flatten(), bw_method='scott')
# res[jj] = minimize(self._posterior1d_2, coeffs0[jj], args=(flux, err, jj, pos, wscale, KDE), method='Powell').x
res[jj] = minimize(self._posterior1d, start_guess[jj], args=(lc, jj, pos, wscale, KDE), method='Powell').x
# Using KDE prior:
# res[jj + Ncbvs] = minimize(self._posterior1d, coeffs0[jj + Ncbvs], args=(flux, jj + no_cbv_coeff, pos, wscale, KDES), method=method).x
# Using flat prior
# res[jj + Ncbvs] = minimize(self._posterior1d, coeffs0[jj + Ncbvs], args=(flux1, jj + no_cbv_coeff, pos, wscale, None), method=method).x
# offset = minimize(self._lhood_off_2, coeffs0[-1], args=(flux, err, res), method='Powell').x
offset = minimize(self._lhood_off, start_guess[-1], args=(lc, res, Ncbvs), method='Powell').x
res = np.append(res, offset)
return res
#----------------------------------------------------------------------------------------------
def _fit(self, lc, err=None, Numcbvs=None, sigma_clip=4.0, maxiter=50, use_bic=True,
logprior=None, start_guess=None):
"""
Will do scaling of lightcurve to relative flux and perform an iterative fit using
sigma-clipping of outliers.
Parameters:
lc (:class:`LightCurve`): Lightcurve to be fitted.
Numcbvs (int, optional): Maximum number of CBVs to use in fit. If ``None`` (Default),
all CBVs are considered. If ``use_bic`` is False, this is the number of CBVs used.
use_bic (bool, optional): Use the Bayesian Information Criterion to select the best
number of CBVs to use. Default=True.
sigma_clip (float, optional): Sigma-clipping limit around model to ignore points.
Default=4.0.
maxiter (int, optional): Maximum number of iterations to do for sigma-clipping.
A warning is issued if this is reached. Default=50.
Returns:
tuple:
- ndarray: Flux of the final CBV model.
- ndarray: CBV coefficients.
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""
logger = logging.getLogger(__name__)
# If no number of CBVs are specified,
# use the full set of CBVs:
if Numcbvs is None:
Numcbvs = self.cbv.shape[1]
# Function to use for fitting.
if logprior:
fitfunc = functools.partial(self.fitting_pos_2, err=err, logprior=logprior)
else:
fitfunc = self.fitting_lh
# Find the median flux to normalise light curve
median_flux = nanmedian(lc.flux)
# Figure out how many CBVs to loop over:
if use_bic:
# Prepare variables for storing results:
bic = np.array([])
solutions = []
# Test a range of CBVs from 1 to Numcbvs
# Fitting at least 1 CBV and an offset
Nstart = 1
else:
# Test only fit with Numcbvs
Nstart = Numcbvs
# Loop over the different number of CBVs to attempt:
for Ncbvs in range(Nstart, Numcbvs+1):
lci = lc.copy() / median_flux
for iters in range(maxiter):
# Do the fit:
res = fitfunc(lci, Ncbvs, start_guess=start_guess)
# Break if nothing changes
if iters == 0:
res0 = res
else:
d = np.sum(res0 - res)
res0 = res
if d == 0:
break
flux_filter = self.mdl(res)
# Do robust sigma clipping:
absdev = np.abs(lci.flux - flux_filter)
mad = MAD_model(absdev)
indx = np.greater(absdev, sigma_clip*mad, where=np.isfinite(absdev))
if np.any(indx):
lci.flux[indx] = np.nan
lci.flux_err[indx] = np.nan
else:
break
else:
logger.warning("Reached maximum number of iterations in CBV fit")
if use_bic:
# Calculate the Bayesian Information Criterion (BIC) and store the solution:
mybic = len(res)*np.log(np.sum(np.isfinite(lci.flux))) + 2*self.negloglike(res, lci) # TODO: lc or lci?!?!
bic = np.append(bic, mybic)
solutions.append(res)
if use_bic:
logger.info('bic %s', bic)
logger.info('iters %s', iters+1)
# Use the solution which minimizes the BIC:
indx = np.argmin(bic)
logger.info('optimum number of CBVs %s', indx+1)
res_final = solutions[indx]
flux_filter = self.mdl(res_final) * median_flux
else:
res_final = res
flux_filter = self.mdl(res_final) * median_flux
return flux_filter, res_final
#----------------------------------------------------------------------------------------------
[docs]
def fit(self, lc, use_bic=True, use_prior=False, cbvs=None, alpha=1.3, WS_lim=0.5, N_neigh=1000):
"""
Fit the CBV object to a lightcurve, and return the fitted cotrending-lightcurve
and the fitting coefficients.
Parameters:
lc (:class:`LightCurve`): Lightcurve to be cotrended.
use_bic (bool, optional): Use the Bayesian Information Criterion to find the
optimal number of CBVs to fit. Default=True.
use_prior (bool, optional):
cbvs (int, optional): Number of CBVs to fit to lightcurve. If `use_bic=True`, this
indicated the maximum number of CBVs to fit.
Returns:
- `numpy.array`: Fitted lightcurve with the same length as `lc`.
- list: Coefficients for each CBV.
- dict: Diagnostics information about the fitting.
"""
logger = logging.getLogger(__name__)
# If no uncertainties are provided, fill it with ones:
if allnan(lc.flux_err):
lc.flux_err[:] = 1
# Remove bad data based on quality
if not allnan(lc.quality):
flag_good = CorrectorQualityFlags.filter(lc.quality)
lc.flux[~flag_good] = np.nan
lc.flux_err[~flag_good] = np.nan
# Diagnostics to return at the end about what was
# actually used in the fitting:
diagnostics = {
'method': None,
'use_bic': use_bic,
'use_prior': use_prior
}
# Fit the CBV to the flux:
if use_prior:
# Do fits including prior information from the initial fits
# allow switching to a simple LSSQ fit depending on
# variability measures (not fully implemented yet!)
# Position of target in multidimentional prior space:
row = lc.meta['task']['pos_row']
col = lc.meta['task']['pos_column']
tmag = np.clip(lc.meta['task']['tmag'], 2, 20)
pos = np.array([row, col, tmag])
# Prior curve
n_components = self.cbs.shape[1]
pc0, opts = self._priorcurve(pos, n_components, N_neigh)
pc = pc0 * lc.meta['task']['mean_flux']
# Compute new variability measure
idx = np.isfinite(lc.flux)
polyfit = np.polyval(np.polyfit(lc.time[idx], lc.flux[idx], 3), lc.time)
residual = MAD_model(lc.flux - pc)
#residual_ratio = MAD_model(lc.flux-lc.meta['task']['mean_flux'])/residual
#WS = np.min([1, residual_ratio])
AA = 2
GRAW = np.std((pc - polyfit) / MAD_model2(lc.flux - polyfit) - 1)
GPR = 0 + (1 - (GRAW/AA)**2) * (GRAW < AA)
beta1 = 1
beta2 = 1
VAR = np.nanstd(lc.flux - polyfit)
WS = np.min([1, (VAR**beta1)*(GPR**beta2)])
if WS > WS_lim:
logger.debug('Fitting using LLSQ')
flux_filter, res = self._fit(lc, Numcbvs=5, use_bic=use_bic) # use smaller number of CBVs
diagnostics['method'] = 'LS'
diagnostics['use_prior'] = False
diagnostics['use_bic'] = False
else:
logger.debug('Fitting using Priors')
# Define multi-dimentional prior:
dist, ind = self.priors.query(pos, k=N_neigh+1)
W = 1/dist[0][1:]**2
V = self.inifit[ind[1:], :]
KDE = gaussian_kde(V, weights=W.flatten(), bw_method='scott')
wscale = 1.0
def logprior(coeff):
return wscale * KDE.logpdf(coeff)
flux_filter, res = self._fit(lc, err=residual, use_bic=use_bic, logprior=logprior, start_guess=opts)
diagnostics.update({
'method': 'MAP',
'residual': residual,
'WS': WS,
'pc': pc
})
else:
# Do "simple" LSSQ fits using BIC to decide on number of CBVs to include
logger.debug('Fitting TIC %d using LLSQ', lc.targetid)
flux_filter, res = self._fit(lc, Numcbvs=cbvs, use_bic=use_bic)
diagnostics['method'] = 'LS'
return flux_filter, res, diagnostics
#----------------------------------------------------------------------------------------------
[docs]
def save_to_fits(self, output_folder, version=6):
"""
Save CBVs to FITS file.
Parameters:
output_folder (str): Path to directory where FITS file should be saved.
version (int): Data release number to add to file header.
Returns:
str: Path to the generated FITS file.
Raises:
FileNotFoundError: If `output_folder` is invalid.
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
.. codeauthor:: Nicholas Saunders <nksaun@hawaii.edu>
"""
# Checks of input:
if not os.path.isdir(output_folder):
raise FileNotFoundError("Invalid output directory")
# Store the date that the file is created
now = datetime.datetime.now(tz=datetime.timezone.utc)
# Timestamps of start and end of timeseries:
tdel = self.cadence/86400
tstart = self.time[0] - tdel/2
tstop = self.time[-1] + tdel/2
tstart_tm = Time(tstart, 2457000, format='jd', scale='tdb')
tstop_tm = Time(tstop, 2457000, format='jd', scale='tdb')
telapse = tstop - tstart
# write fits header information
hdr = fits.Header()
hdr['EXTNAME'] = ('PRIMARY', 'extension name')
hdr['ORIGIN'] = ('TASOC/Aarhus', 'institution responsible for creating this file')
hdr['DATE'] = (now.strftime("%Y-%m-%d"), 'file creation date')
hdr['TELESCOP'] = ('TESS', 'telescope')
hdr['INSTRUME'] = ('TESS Photometer', 'detector type')
hdr['SECTOR'] = (self.sector, 'Observing sector')
hdr['DATA_REL'] = (self.data_rel, 'data release version number')
hdr['VERSION'] = (version, 'TASOC data release version number')
hdr['CADENCE'] = (self.cadence, '[s] observation cadence')
hdr['PROCVER'] = (self.version, 'Version of corrections pipeline')
hdr['FILEVER'] = ('1.0', 'File format version')
hdr['TSTART'] = (tstart, 'observation start time in TJD')
hdr['TSTOP'] = (tstop, 'observation stop time in TJD')
hdr['DATE-OBS'] = (tstart_tm.utc.isot, 'TSTART as UTC calendar date')
hdr['DATE-END'] = (tstop_tm.utc.isot, 'TSTOP as UTC calendar date')
hdr['CAMERA'] = (self.camera, 'CCD camera')
hdr['CCD'] = (self.ccd, 'CCD chip')
hdr['CBV_AREA'] = (self.cbv_area, 'CCD area')
# Create primary hdu:
phdu = fits.PrimaryHDU(header=hdr)
# Create common table headers:
table_header = fits.Header()
table_header['TIMEREF'] = ('SOLARSYSTEM', 'barycentric correction applied to times')
table_header['TIMESYS'] = ('TDB', 'time system is Barycentric Dynamical Time (TDB)')
table_header['JDREFI'] = (2457000, 'integer part of BTJD reference date')
table_header['JDREFF'] = (0.0, 'fraction of the day in BTJD reference date')
table_header['TIMEUNIT'] = ('d', 'time unit for TIME, TSTART and TSTOP')
table_header['TSTART'] = (tstart, 'observation start time in TJD')
table_header['TSTOP'] = (tstop, 'observation stop time in TJD')
table_header['DATE-OBS'] = (tstart_tm.utc.isot, 'TSTART as UTC calendar date')
table_header['DATE-END'] = (tstop_tm.utc.isot, 'TSTOP as UTC calendar date')
table_header['TELAPSE'] = (telapse, '[d] LC_STOP - LC_START')
table_header['TIMEPIXR'] = (0.5, 'bin time beginning=0 middle=0.5 end=1')
table_header['TIMEDEL'] = (tdel, '[d] time resolution of data')
table_header['CAMERA'] = (self.camera, 'CCD camera')
table_header['CCD'] = (self.ccd, 'CCD chip')
table_header['CBV_AREA'] = (self.cbv_area, 'CCD area')
# Settings used when generating the CBV:
table_header['THR_COR'] = (self.threshold_correlation, 'Fraction of stars used for CBVs')
table_header['THR_VAR'] = (self.threshold_variability, 'Threshold for variability rejection')
table_header['THR_SNR'] = (self.threshold_snrtest, 'Threshold for SNR test')
table_header['THR_ENT'] = (self.threshold_entropy, 'Threshold for entropy cleaning')
# Columns that are common between all tables:
col_time = fits.Column(name='TIME', format='D', unit='JD - 2457000, days', disp='D17.7', array=self.time)
col_cadno = fits.Column(name='CADENCENO', format='J', disp='I10', array=self.cadenceno)
# Single-scale CBVs:
cols = [col_time, col_cadno]
col_titles = {}
# store all CBVs for each camera and chip
for n in range(self.cbv.shape[1]):
col_name = 'VECTOR_%d' % (n+1)
col = fits.Column(name=col_name, format='E', disp='F8.5', array=self.cbv[:, n])
cols.append(col)
col_titles[col_name] = 'column title: co-trending basis vector %d' % (n+1)
# append CBVs as hdu columns
table_hdu1 = fits.BinTableHDU.from_columns(cols, header=table_header, name=f'CBV.SINGLE-SCALE.{self.cbv_area:d}')
# Fix table headers:
fix_fits_table_headers(table_hdu1, column_titles=col_titles)
table_hdu1.header.comments['TTYPE1'] = 'column title: data time stamps'
table_hdu1.header.comments['TUNIT1'] = 'column units: TESS modified Julian date (TJD)'
table_hdu1.header.comments['TTYPE2'] = 'column title: unique cadence number'
# Spike CBVs:
cols = [col_time, col_cadno]
col_titles = {}
# store all CBVs for each camera and chip
for n in range(self.cbv_s.shape[1]):
col_name = 'VECTOR_%d' % (n+1)
col = fits.Column(name=col_name, format='E', disp='F8.5', array=self.cbv_s[:, n])
col_titles[col_name] = 'column title: co-trending basis vector %d' % (n+1)
cols.append(col)
# append CBVs as hdu columns
table_hdu2 = fits.BinTableHDU.from_columns(cols, header=table_header, name=f'CBV.SPIKE.{self.cbv_area:d}')
# Fix table headers:
fix_fits_table_headers(table_hdu2, column_titles=col_titles)
table_hdu2.header.comments['TTYPE1'] = 'column title: data time stamps'
table_hdu2.header.comments['TUNIT1'] = 'column units: TESS modified Julian date (TJD)'
table_hdu2.header.comments['TTYPE2'] = 'column title: unique cadence number'
# Name of the
fname = f'tess-s{self.sector:04d}-c{self.cadence:04d}-a{self.cbv_area:d}-v{version:d}-tasoc_cbv.fits.gz'
filepath = os.path.join(output_folder, fname)
# store as HDU list and write to fits file
with fits.HDUList([phdu, table_hdu1, table_hdu2]) as hdul:
hdul.writeto(filepath, overwrite=True, checksum=True)
return filepath