Source code for photometry.utilities

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Collection of utility functions that can be used throughout
the photometry package.

.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""

import numpy as np
from astropy.io import fits
from bottleneck import move_median, nanmedian, nanmean, allnan, nanargmin, nanargmax
import logging
import tqdm
from scipy.special import erf
from scipy.stats import binned_statistic
import json
import os.path
import fnmatch
import glob
import itertools
import requests
from concurrent.futures import ThreadPoolExecutor
from threading import Lock

# Constants:
mad_to_sigma = 1.482602218505602 #: Constant for converting from MAD to SIGMA. Constant is 1/norm.ppf(3/4)

#--------------------------------------------------------------------------------------------------
[docs]def load_settings(sector=None): with open(os.path.join(os.path.dirname(__file__), 'data', 'settings.json'), 'r') as fid: settings = json.load(fid) if sector is not None: return settings['sectors'][str(sector)] return settings
#--------------------------------------------------------------------------------------------------
[docs]def find_ffi_files(rootdir, sector=None, camera=None, ccd=None): """ Search directory recursively for TESS FFI images in FITS format. Parameters: rootdir (string): Directory to search recursively for TESS FFI images. sector (integer or None, optional): Only return files from the given sector. If ``None``, files from all sectors are returned. camera (integer or None, optional): Only return files from the given camera number (1-4). If ``None``, files from all cameras are returned. ccd (integer or None, optional): Only return files from the given CCD number (1-4). If ``None``, files from all CCDs are returned. Returns: list: List of full paths to FFI FITS files found in directory. The list will be sorted accoridng to the filename of the files, e.g. primarily by time. """ logger = logging.getLogger(__name__) # Create the filename pattern to search for: sector_str = '????' if sector is None else '{0:04d}'.format(sector) camera = '?' if camera is None else str(camera) ccd = '?' if ccd is None else str(ccd) filename_pattern = 'tess*-s{sector:s}-{camera:s}-{ccd:s}-????-[xsab]_ffic.fits*'.format( sector=sector_str, camera=camera, ccd=ccd ) logger.debug("Searching for FFIs in '%s' using pattern '%s'", rootdir, filename_pattern) # Do a recursive search in the directory, finding all files that match the pattern: matches = [] for root, dirnames, filenames in os.walk(rootdir, followlinks=True): for filename in fnmatch.filter(filenames, filename_pattern): matches.append(os.path.join(root, filename)) # Sort the list of files by thir filename: matches.sort(key=lambda x: os.path.basename(x)) return matches
#--------------------------------------------------------------------------------------------------
[docs]def find_tpf_files(rootdir, starid=None, sector=None, camera=None, ccd=None, findmax=None): """ Search directory recursively for TESS Target Pixel Files. Parameters: rootdir (string): Directory to search recursively for TESS TPF files. starid (integer or None, optional): Only return files from the given TIC number. If ``None``, files from all TIC numbers are returned. sector (integer or None, optional): Only return files from the given sector. If ``None``, files from all sectors are returned. camera (integer or None, optional): Only return files from the given camera number (1-4). If ``None``, files from all cameras are returned. ccd (integer or None, optional): Only return files from the given CCD number (1-4). If ``None``, files from all CCDs are returned. findmax (integer or None, optional): Maximum number of files to return. If ``None``, return all files. Note: Filtering on camera and/or ccd will cause the program to read the headers of the files in order to determine the camera and ccd from which they came. This can significantly slow down the query. Returns: list: List of full paths to TPF FITS files found in directory. The list will be sorted accoriding to the filename of the files, e.g. primarily by time. """ logger = logging.getLogger(__name__) # Create the filename pattern to search for: sector_str = '????' if sector is None else '{0:04d}'.format(sector) starid_str = '*' if starid is None else '{0:016d}'.format(starid) filename_pattern = 'tess*-s{sector:s}-{starid:s}-????-[xsab]_tp.fits*'.format( sector=sector_str, starid=starid_str ) # Pattern used for TESS Alert data: sector_str = '??' if sector is None else '{0:02d}'.format(sector) starid_str = '*' if starid is None else '{0:011d}'.format(starid) filename_pattern2 = 'hlsp_tess-data-alerts_tess_phot_{starid:s}-s{sector:s}_tess_v?_tp.fits*'.format( sector=sector_str, starid=starid_str ) logger.debug("Searching for TPFs in '%s' using pattern '%s'", rootdir, filename_pattern) logger.debug("Searching for TPFs in '%s' using pattern '%s'", rootdir, filename_pattern2) # Do a recursive search in the directory, finding all files that match the pattern: breakout = False matches = [] for root, dirnames, filenames in os.walk(rootdir, followlinks=True): for filename in filenames: if fnmatch.fnmatch(filename, filename_pattern) or fnmatch.fnmatch(filename, filename_pattern2): fpath = os.path.join(root, filename) if camera is not None and fits.getval(fpath, 'CAMERA', ext=0) != camera: continue if ccd is not None and fits.getval(fpath, 'CCD', ext=0) != ccd: continue matches.append(fpath) if findmax is not None and len(matches) >= findmax: breakout = True break if breakout: break # Sort the list of files by thir filename: matches.sort(key=lambda x: os.path.basename(x)) return matches
#--------------------------------------------------------------------------------------------------
[docs]def find_hdf5_files(rootdir, sector=None, camera=None, ccd=None): """ Search the input directory for HDF5 files matching constraints. Parameters: rootdir (string): Directory to search for HDF5 files. sector (integer, list or None, optional): Only return files from the given sectors. If ``None``, files from all TIC numbers are returned. camera (integer, list or None, optional): Only return files from the given camera. If ``None``, files from all cameras are returned. ccd (integer, list or None, optional): Only return files from the given ccd. If ``None``, files from all ccds are returned. Returns: list: List of paths to HDF5 files matching constraints. """ if not isinstance(sector, (list, tuple)): sector = (sector,) if not isinstance(camera, (list, tuple)): camera = (1,2,3,4) if camera is None else (camera,) if not isinstance(ccd, (list, tuple)): ccd = (1,2,3,4) if ccd is None else (ccd,) filelst = [] for sector, camera, ccd in itertools.product(sector, camera, ccd): filelst += glob.glob(os.path.join(rootdir, 'sector{0:s}_camera{1:d}_ccd{2:d}.hdf5'.format( '???' if sector is None else '%03d' % sector, camera, ccd ))) return filelst
#--------------------------------------------------------------------------------------------------
[docs]def find_catalog_files(rootdir, sector=None, camera=None, ccd=None): """ Search the input directory for CATALOG (sqlite) files matching constraints. Parameters: rootdir (string): Directory to search for CATALOG files. sector (integer, list or None, optional): Only return files from the given sectors. If ``None``, files from all TIC numbers are returned. camera (integer, list or None, optional): Only return files from the given camera. If ``None``, files from all cameras are returned. ccd (integer, list or None, optional): Only return files from the given ccd. If ``None``, files from all ccds are returned. Returns: list: List of paths to CATALOG files matching constraints. """ if not isinstance(sector, (list, tuple)): sector = (sector,) if not isinstance(camera, (list, tuple)): camera = (1,2,3,4) if camera is None else (camera,) if not isinstance(ccd, (list, tuple)): ccd = (1,2,3,4) if ccd is None else (ccd,) filelst = [] for sector, camera, ccd in itertools.product(sector, camera, ccd): filelst += glob.glob(os.path.join(rootdir, 'catalog_sector{0:s}_camera{1:d}_ccd{2:d}.sqlite'.format( '???' if sector is None else '%03d' % sector, camera, ccd ))) return filelst
#--------------------------------------------------------------------------------------------------
[docs]def load_ffi_fits(path, return_header=False, return_uncert=False): """ Load FFI FITS file. Calibrations columns and rows are trimmed from the image. Parameters: path (str): Path to FITS file. return_header (boolean, optional): Return FITS headers as well. Default is ``False``. Returns: numpy.ndarray: Full Frame Image. list: If ``return_header`` is enabled, will return a dict of the FITS headers. """ with fits.open(path, memmap=True, mode='readonly') as hdu: hdr = hdu[0].header if hdr.get('TELESCOP') == 'TESS' and hdu[1].header.get('NAXIS1') == 2136 and hdu[1].header.get('NAXIS2') == 2078: img = hdu[1].data[0:2048, 44:2092] if return_uncert: imgerr = np.asarray(hdu[2].data[0:2048, 44:2092], dtype='float32') headers = dict(hdu[0].header) headers.update(dict(hdu[1].header)) else: img = hdu[0].data headers = dict(hdu[0].header) if return_uncert: imgerr = np.asarray(hdu[1].data, dtype='float32') # Make sure its an numpy array with the correct data type: img = np.asarray(img, dtype='float32') if return_uncert and return_header: return img, headers, imgerr elif return_uncert: return img, imgerr elif return_header: return img, headers else: return img
#-------------------------------------------------------------------------------------------------- def _move_median_central_1d(x, width_points): y = move_median(x, width_points, min_count=1) y = np.roll(y, -width_points//2+1) for k in range(width_points//2+1): y[k] = nanmedian(x[:(k+2)]) y[-(k+1)] = nanmedian(x[-(k+2):]) return y #--------------------------------------------------------------------------------------------------
[docs]def move_median_central(x, width_points, axis=0): return np.apply_along_axis(_move_median_central_1d, axis, x, width_points)
#--------------------------------------------------------------------------------------------------
[docs]def add_proper_motion(ra, dec, pm_ra, pm_dec, bjd, epoch=2000.0): """ Project coordinates (ra,dec) with proper motions to new epoch. Parameters: ra (float) : Right ascension. dec (float) : Declination. pm_ra (float) : Proper motion in RA (mas/year). pm_dec (float) : Proper motion in Declination (mas/year). bjd (float) : Julian date to calculate coordinates for. epoch (float, optional) : Epoch of ``ra`` and ``dec``. Default=2000. Returns: (float, float) : RA and Declination at the specified date. """ # Convert BJD to epoch (year): epoch_now = (bjd - 2451544.5)/365.25 + 2000.0 # How many years since the catalog's epoch? timeelapsed = epoch_now - epoch # in years # Calculate the dec: decrate = pm_dec/3600000.0 # in degrees/year (assuming original was in mas/year) decindegrees = dec + timeelapsed*decrate # Calculate the unprojected rate of RA motion, using the mean declination between # the catalog and present epoch. rarate = pm_ra/np.cos((dec + timeelapsed*decrate/2.0)*np.pi/180.0)/3600000.0 # in degress of RA/year (assuming original was in mas/year) raindegrees = ra + timeelapsed*rarate # Return the current positions return raindegrees, decindegrees
#--------------------------------------------------------------------------------------------------
[docs]def integratedGaussian(x, y, flux, x_0, y_0, sigma=1): """ Evaluate a 2D symmetrical Gaussian integrated in pixels. Parameters: x (numpy array) : x coordinates at which to evaluate the PSF. y (numpy array) : y coordinates at which to evaluate the PSF. flux (float) : Integrated value. x_0 (float) : Centroid position. y_0 (float) : Centroid position. sigma (float, optional) : Standard deviation of Gaussian. Default=1. Returns: numpy array : 2D Gaussian integrated pixel values at (x,y). Note: Inspired by https://github.com/astropy/photutils/blob/master/photutils/psf/models.py Example: >>> import numpy as np >>> X, Y = np.meshgrid(np.arange(-1,2), np.arange(-1,2)) >>> integratedGaussian(X, Y, 10, 0, 0) array([[ 0.58433556, 0.92564571, 0.58433556], [ 0.92564571, 1.46631496, 0.92564571], [ 0.58433556, 0.92564571, 0.58433556]]) """ return (flux / 4 * ((erf((x - x_0 + 0.5) / (np.sqrt(2) * sigma)) - erf((x - x_0 - 0.5) / (np.sqrt(2) * sigma))) * (erf((y - y_0 + 0.5) / (np.sqrt(2) * sigma)) - erf((y - y_0 - 0.5) / (np.sqrt(2) * sigma)))))
#--------------------------------------------------------------------------------------------------
[docs]def mag2flux(mag, zp=20.60654144): """ Convert from magnitude to flux using scaling relation from aperture photometry. This is an estimate. The scaling is based on fast-track TESS data from sectors 1 and 2. Parameters: mag (float): Magnitude in TESS band. Returns: float: Corresponding flux value """ return np.clip(10**(-0.4*(mag - zp)), 0, None)
#--------------------------------------------------------------------------------------------------
[docs]def sphere_distance(ra1, dec1, ra2, dec2): """ Calculate the great circle distance between two points using the Vincenty formulae. Parameters: ra1 (float or ndarray): Longitude of first point in degrees. dec1 (float or ndarray): Lattitude of first point in degrees. ra2 (float or ndarray): Longitude of second point in degrees. dec2 (float or ndarray): Lattitude of second point in degrees. Returns: ndarray: Distance between points in degrees. Note: https://en.wikipedia.org/wiki/Great-circle_distance """ # Convert angles to radians: ra1 = np.deg2rad(ra1) ra2 = np.deg2rad(ra2) dec1 = np.deg2rad(dec1) dec2 = np.deg2rad(dec2) # Calculate distance using Vincenty formulae: return np.rad2deg(np.arctan2( np.sqrt( (np.cos(dec2)*np.sin(ra2-ra1))**2 + (np.cos(dec1)*np.sin(dec2) - np.sin(dec1)*np.cos(dec2)*np.cos(ra2-ra1))**2 ), np.sin(dec1)*np.sin(dec2) + np.cos(dec1)*np.cos(dec2)*np.cos(ra2-ra1) ))
#--------------------------------------------------------------------------------------------------
[docs]def radec_to_cartesian(radec): """ Convert spherical coordinates as (ra, dec) pairs to cartesian coordinates (x,y,z). Parameters: radec (ndarray): Array with ra-dec pairs in degrees. Returns: ndarray: (x,y,z) coordinates corresponding to input coordinates. """ radec = np.atleast_2d(radec) xyz = np.empty((radec.shape[0], 3), dtype='float64') phi = np.radians(radec[:,0]) theta = np.pi/2 - np.radians(radec[:,1]) xyz[:,0] = np.sin(theta) * np.cos(phi) xyz[:,1] = np.sin(theta) * np.sin(phi) xyz[:,2] = np.cos(theta) return xyz
#--------------------------------------------------------------------------------------------------
[docs]def cartesian_to_radec(xyz): """ Convert cartesian coordinates (x,y,z) to spherical coordinates in ra-dec form. Parameters: radec (ndarray): Array with ra-dec pairs. Returns: ndarray: ra-dec coordinates in degrees corresponding to input coordinates. """ xyz = np.atleast_2d(xyz) radec = np.empty((xyz.shape[0], 2), dtype='float64') radec[:,1] = np.pi/2 - np.arccos(xyz[:,2]) radec[:,0] = np.arctan2(xyz[:,1], xyz[:,0]) indx = radec[:,0] < 0 radec[indx,0] = 2*np.pi - np.abs(radec[indx,0]) indx = radec[:,0] > 2*np.pi radec[indx,0] -= 2*np.pi return np.degrees(radec)
#--------------------------------------------------------------------------------------------------
[docs]def rms_timescale(time, flux, timescale=3600/86400): """ Compute robust RMS on specified timescale. Using MAD scaled to RMS. Parameters: time (ndarray): Timestamps in days. flux (ndarray): Flux to calculate RMS for. timescale (float, optional): Timescale to bin timeseries before calculating RMS. Default=1 hour. Returns: float: Robust RMS on specified timescale. .. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk> """ time = np.asarray(time) flux = np.asarray(flux) if len(flux) == 0 or allnan(flux): return np.nan if len(time) == 0 or allnan(time): raise ValueError("Invalid time-vector specified. No valid timestamps.") time_min = np.nanmin(time) time_max = np.nanmax(time) if not np.isfinite(time_min) or not np.isfinite(time_max) or time_max - time_min <= 0: raise ValueError("Invalid time-vector specified") # Construct the bin edges seperated by the timescale: bins = np.arange(time_min, time_max, timescale) bins = np.append(bins, time_max) # Bin the timeseries to one hour: indx = np.isfinite(flux) flux_bin, _, _ = binned_statistic(time[indx], flux[indx], nanmean, bins=bins) # Compute robust RMS value (MAD scaled to RMS) return mad_to_sigma * nanmedian(np.abs(flux_bin - nanmedian(flux_bin)))
#--------------------------------------------------------------------------------------------------
[docs]def find_nearest(array, value): """ Search array for value and return the index where the value is closest. Parameters: array (ndarray): Array to search. value: Value to search array for. Returns: int: Index of ``array`` closest to ``value``. Raises: ValueError: If ``value`` is NaN. .. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk> """ if np.isnan(value): raise ValueError("Invalid search value") if np.isposinf(value): return nanargmax(array) if np.isneginf(value): return nanargmin(array) return nanargmin(np.abs(array - value))
#idx = np.searchsorted(array, value, side='left') #if idx > 0 and (idx == len(array) or abs(value - array[idx-1]) <= abs(value - array[idx])): # return idx-1 #else: # return idx #--------------------------------------------------------------------------------------------------
[docs]def download_file(url, destination, position_holders=None, position_lock=None): """ Download file from URL and place into specified destination. Parameters: url (string): URL to file to be downloaded. destination (string): Path where to save file. .. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk> """ logger = logging.getLogger(__name__) tqdm_settings = { 'unit': 'B', 'unit_scale': True, 'position': None, 'leave': True, 'disable': not logger.isEnabledFor(logging.INFO) } if position_holders is not None: tqdm_settings['leave'] = False position_lock.acquire() tqdm_settings['position'] = position_holders.index(False) position_holders[tqdm_settings['position']] = True position_lock.release() try: # Start stream from URL and throw an error for bad status codes: response = requests.get(url, stream=True, allow_redirects=True) response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) block_size = 1024 with open(destination, 'wb') as handle: with tqdm.tqdm(total=total_size, **tqdm_settings) as pbar: for block in response.iter_content(block_size): handle.write(block) pbar.update(len(block)) if os.path.getsize(destination) != total_size: raise Exception("File not downloaded correctly") except: # noqa: E722 logger.exception("Could not download file") if os.path.exists(destination): os.remove(destination) raise # Pause before returning to give progress bar time to write. if position_holders is not None: position_lock.acquire() position_holders[tqdm_settings['position']] = False position_lock.release()
#--------------------------------------------------------------------------------------------------
[docs]def download_parallel(urls, workers=4): """ Download several files in parallel using multiple threads. Parameters: urls (iterable): List of files to download. Each element should consist of a list or tuple, containing two elements: The URL to download, and the path to the destination where the file should be saved. workers (integer, optional): Number of threads to use for downloading. Default=4. .. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk> """ # Don't overcomplicate things for a singe file: if len(urls) == 1: download_file(urls[0][0], urls[0][1]) return workers = min(workers, len(urls)) position_holders = [False] * workers plock = Lock() def _wrapper(arg): download_file(arg[0], arg[1], position_holders=position_holders, position_lock=plock) with ThreadPoolExecutor(max_workers=workers) as executor: executor.map(_wrapper, urls)
#--------------------------------------------------------------------------------------------------
[docs]class TqdmLoggingHandler(logging.Handler):
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] def emit(self, record): try: msg = self.format(record) tqdm.tqdm.write(msg) self.flush() except (KeyboardInterrupt, SystemExit): raise except: # noqa: E722 self.handleError(record)
#--------------------------------------------------------------------------------------------------
[docs]class ListHandler(logging.Handler): """ A logging.Handler that writes messages into a list object. The standard logging.QueueHandler/logging.QueueListener can not be used for this because the QueueListener runs in a private thread, not the main thread. .. warning:: This handler is not thread-safe. Do not use it in threaded environments. """
[docs] def __init__(self, *args, message_queue, **kwargs): """Initialize by copying the queue and sending everything else to superclass.""" super().__init__(*args, **kwargs) self.message_queue = message_queue
[docs] def emit(self, record): """Add the formatted log message (sans newlines) to the queue.""" self.message_queue.append(self.format(record).rstrip('\n'))
#--------------------------------------------------------------------------------------------------
[docs]class LoggerWriter(object): """ File-like object which passes input into a logger. Can be used together with :py:func:`contextlib.redirect_stdout` or :py:func:`contextlib.redirect_stderr` to redirect streams to the given logger. Can be useful for wrapping codes which uses normal :py:func:`print` functions for logging. .. code-block:: python :linenos: logger = logging.getLogger(__name__) with contextlib.redirect_stdout(LoggerWriter(logger, logging.INFO)): print("This goes into the logger instead of STDOUT") .. warning:: This object is not thread-safe. Do not use it in threaded environments. .. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk> """
[docs] def __init__(self, logger, level=logging.INFO): self.logger = logger self.level = level
[docs] def write(self, message): if message.strip() != '': self.logger.log(self.level, message)