#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Collection of utility functions that can be used throughout
the corrections package.
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""
import numpy as np
import pickle
import gzip
import logging
from astropy.io import fits
from bottleneck import nanmedian, nanmean, allnan
from scipy.stats import binned_statistic
# Constants:
mad_to_sigma = 1.482602218505602 #: Conversion constant from MAD to Sigma. Constant is 1/norm.ppf(3/4)
PICKLE_DEFAULT_PROTOCOL = 4 #: Default protocol to use for saving pickle files.
#------------------------------------------------------------------------------
[docs]
def savePickle(fname, obj):
"""
Save an object to file using pickle.
Parameters:
fname (string): File name to save to. If the name ends in '.gz' the file
will be automatically gzipped.
obj (object): Any pickalble object to be saved to file.
"""
if fname.endswith('.gz'):
o = gzip.open
else:
o = open
with o(fname, 'wb') as fid:
pickle.dump(obj, fid, protocol=PICKLE_DEFAULT_PROTOCOL)
#------------------------------------------------------------------------------
[docs]
def loadPickle(fname):
"""
Load an object from file using pickle.
Parameters:
fname (string): File name to save to. If the name ends in '.gz' the file
will be automatically unzipped.
obj (object): Any pickalble object to be saved to file.
Returns:
object: The unpickled object from the file.
"""
if fname.endswith('.gz'):
o = gzip.open
else:
o = open
with o(fname, 'rb') as fid:
return pickle.load(fid)
#------------------------------------------------------------------------------
[docs]
def sphere_distance(ra1, dec1, ra2, dec2):
"""
Calculate the great circle distance between two points using the Vincenty formulae.
Parameters:
ra1 (float or ndarray): Longitude of first point in degrees.
dec1 (float or ndarray): Lattitude of first point in degrees.
ra2 (float or ndarray): Longitude of second point in degrees.
dec2 (float or ndarray): Lattitude of second point in degrees.
Returns:
ndarray: Distance between points in degrees.
Note:
https://en.wikipedia.org/wiki/Great-circle_distance
"""
# Convert angles to radians:
ra1 = np.deg2rad(ra1)
ra2 = np.deg2rad(ra2)
dec1 = np.deg2rad(dec1)
dec2 = np.deg2rad(dec2)
# Calculate distance using Vincenty formulae:
return np.rad2deg(np.arctan2(
np.sqrt( (np.cos(dec2)*np.sin(ra2-ra1))**2 + (np.cos(dec1)*np.sin(dec2) - np.sin(dec1)*np.cos(dec2)*np.cos(ra2-ra1))**2 ),
np.sin(dec1)*np.sin(dec2) + np.cos(dec1)*np.cos(dec2)*np.cos(ra2-ra1)
))
#------------------------------------------------------------------------------
[docs]
def rms_timescale(lc, timescale=3600/86400):
"""
Compute robust RMS on specified timescale. Using MAD scaled to RMS.
Parameters:
lc (``lightkurve.TessLightCurve`` object): Timeseries to calculate RMS for.
timescale (float, optional): Timescale to bin timeseries before calculating RMS. Default=1 hour.
Returns:
float: Robust RMS on specified timescale.
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""
if len(lc.flux) == 0 or allnan(lc.flux):
return np.nan
if len(lc.time) == 0 or allnan(lc.time):
raise ValueError("Invalid time-vector specified. No valid timestamps.")
time_min = np.nanmin(lc.time)
time_max = np.nanmax(lc.time)
if not np.isfinite(time_min) or not np.isfinite(time_max) or time_max - time_min <= 0:
raise ValueError("Invalid time-vector specified")
# Construct the bin edges seperated by the timescale:
bins = np.arange(time_min, time_max, timescale)
bins = np.append(bins, time_max)
# Bin the timeseries to one hour:
indx = np.isfinite(lc.time) & np.isfinite(lc.flux)
flux_bin, _, _ = binned_statistic(lc.time[indx], lc.flux[indx], nanmean, bins=bins)
# Compute robust RMS value (MAD scaled to RMS)
return mad_to_sigma * nanmedian(np.abs(flux_bin - nanmedian(flux_bin)))
#--------------------------------------------------------------------------------------------------
[docs]
def ptp(lc):
"""
Compute robust Point-To-Point scatter.
Parameters:
lc (``lightkurve.TessLightCurve`` object): Lightcurve to calculate PTP for.
Returns:
float: Robust PTP.
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""
if len(lc.flux) == 0 or allnan(lc.flux):
return np.nan
if len(lc.time) == 0 or allnan(lc.time):
raise ValueError("Invalid time-vector specified. No valid timestamps.")
return nanmedian(np.abs(np.diff(lc.flux)))
#--------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------
[docs]
class ListHandler(logging.Handler):
"""
A logging.Handler that writes messages into a list object.
The standard logging.QueueHandler/logging.QueueListener can not be used
for this because the QueueListener runs in a private thread, not the
main thread.
.. warning::
This handler is not thread-safe. Do not use it in threaded environments.
"""
[docs]
def __init__(self, *args, message_queue, **kwargs):
"""Initialize by copying the queue and sending everything else to superclass."""
logging.Handler.__init__(self, *args, **kwargs)
self.message_queue = message_queue
[docs]
def emit(self, record):
"""Add the formatted log message (sans newlines) to the queue."""
self.message_queue.append(self.format(record).rstrip('\n'))
#--------------------------------------------------------------------------------------------------
[docs]
class LoggerWriter(object):
"""
File-like object which passes input into a logger.
Can be used together with :py:func:`contextlib.redirect_stdout`
or :py:func:`contextlib.redirect_stderr` to redirect streams to the given logger.
Can be useful for wrapping codes which uses normal :py:func:`print` functions for logging.
.. code-block:: python
:linenos:
logger = logging.getLogger(__name__)
with contextlib.redirect_stdout(LoggerWriter(logger, logging.INFO)):
print("This goes into the logger instead of STDOUT")
.. warning::
This object is not thread-safe. Do not use it in threaded environments.
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""
[docs]
def __init__(self, logger, level=logging.INFO):
self.logger = logger
self.level = level
[docs]
def write(self, message):
if message.strip() != '':
self.logger.log(self.level, message)
#--------------------------------------------------------------------------------------------------
[docs]
def CadenceType(x):
"""
Type-checker function to be used with argparse for "cadence" inputs.
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""
return 'ffi' if isinstance(x, str) and x.lower() == 'ffi' else int(x)