Source code for photometry.utilities

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Collection of utility functions that can be used throughout
the photometry package.

.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""

import logging
import os.path
import contextlib
import numpy as np
from bottleneck import move_median, nanmedian, nanmean, allnan, nanargmin, nanargmax
import tqdm
from scipy.special import erf
from scipy.stats import binned_statistic
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import concurrent.futures
from threading import Lock

# Constants:
mad_to_sigma = 1.482602218505602 #: Constant for converting from MAD to SIGMA. Constant is 1/norm.ppf(3/4)

#--------------------------------------------------------------------------------------------------
[docs] def to_tuple(inp, default=None): """ Convert iterable or single values to tuple. This function is used for converting inputs, perticularly for preparing input to functions cached with :func:`functools.lru_cache`, to ensure inputs are hashable. Parameters: inp: Input to convert to tuple. default: If ``input`` is ``None`` return this instead. Returns: tuple: ``inp`` converted to tuple. """ if inp is None: return default if isinstance(inp, (list, set, frozenset, np.ndarray)): return tuple(inp) if isinstance(inp, (int, float, bool, str)): return (inp, ) return inp
#-------------------------------------------------------------------------------------------------- def _move_median_central_1d(x, width_points): y = move_median(x, width_points, min_count=1) y = np.roll(y, -width_points//2+1) for k in range(width_points//2+1): y[k] = nanmedian(x[:(k+2)]) y[-(k+1)] = nanmedian(x[-(k+2):]) return y #--------------------------------------------------------------------------------------------------
[docs] def move_median_central(x, width_points, axis=0): return np.apply_along_axis(_move_median_central_1d, axis, x, width_points)
#--------------------------------------------------------------------------------------------------
[docs] def add_proper_motion(ra, dec, pm_ra, pm_dec, bjd, epoch=2000.0): """ Project coordinates (ra,dec) with proper motions to new epoch. Parameters: ra (float) : Right ascension. dec (float) : Declination. pm_ra (float) : Proper motion in RA (mas/year). pm_dec (float) : Proper motion in Declination (mas/year). bjd (float) : Julian date to calculate coordinates for. epoch (float, optional) : Epoch of ``ra`` and ``dec``. Default=2000. Returns: (float, float) : RA and Declination at the specified date. """ # Convert BJD to epoch (year): epoch_now = (bjd - 2451544.5)/365.25 + 2000.0 # How many years since the catalog's epoch? timeelapsed = epoch_now - epoch # in years # Calculate the dec: decrate = pm_dec/3600000.0 # in degrees/year (assuming original was in mas/year) decindegrees = dec + timeelapsed*decrate # Calculate the unprojected rate of RA motion, using the mean declination between # the catalog and present epoch. rarate = pm_ra/np.cos((dec + timeelapsed*decrate/2.0)*np.pi/180.0)/3600000.0 # in degress of RA/year (assuming original was in mas/year) raindegrees = ra + timeelapsed*rarate # Return the current positions return raindegrees, decindegrees
#--------------------------------------------------------------------------------------------------
[docs] def integratedGaussian(x, y, flux, x_0, y_0, sigma=1): """ Evaluate a 2D symmetrical Gaussian integrated in pixels. Parameters: x (numpy.ndarray): x coordinates at which to evaluate the PSF. y (numpy.ndarray): y coordinates at which to evaluate the PSF. flux (float): Integrated value. x_0 (float): Centroid position. y_0 (float): Centroid position. sigma (float, optional): Standard deviation of Gaussian. Default=1. Returns: numpy array : 2D Gaussian integrated pixel values at (x,y). Note: Inspired by https://github.com/astropy/photutils/blob/master/photutils/psf/models.py Example: >>> import numpy as np >>> X, Y = np.meshgrid(np.arange(-1,2), np.arange(-1,2)) >>> integratedGaussian(X, Y, 10, 0, 0) array([[ 0.58433556, 0.92564571, 0.58433556], [ 0.92564571, 1.46631496, 0.92564571], [ 0.58433556, 0.92564571, 0.58433556]]) """ denom = np.sqrt(2) * sigma return (flux / 4 * ((erf((x - x_0 + 0.5) / denom) - erf((x - x_0 - 0.5) / denom)) * (erf((y - y_0 + 0.5) / denom) - erf((y - y_0 - 0.5) / denom)))) # noqa: ET126
#--------------------------------------------------------------------------------------------------
[docs] def mag2flux(mag, zp=20.451): """ Convert from magnitude to flux using scaling relation from aperture photometry. This is an estimate. The default scaling is based on TASOC Data Release 5 from sectors 1-5. Parameters: mag (ndarray): Magnitude in TESS band. zp (float): Zero-point to use in scaling. Default is estimated from TASOC Data Release 5 from TESS sectors 1-5. Returns: ndarray: Corresponding flux value """ return np.clip(10**(-0.4*(mag - zp)), 0, None)
#--------------------------------------------------------------------------------------------------
[docs] def sphere_distance(ra1, dec1, ra2, dec2): """ Calculate the great circle distance between two points using the Vincenty formulae. Parameters: ra1 (float or ndarray): Longitude of first point in degrees. dec1 (float or ndarray): Lattitude of first point in degrees. ra2 (float or ndarray): Longitude of second point in degrees. dec2 (float or ndarray): Lattitude of second point in degrees. Returns: ndarray: Distance between points in degrees. Note: https://en.wikipedia.org/wiki/Great-circle_distance """ # Convert angles to radians: ra1 = np.deg2rad(ra1) ra2 = np.deg2rad(ra2) dec1 = np.deg2rad(dec1) dec2 = np.deg2rad(dec2) # Calculate distance using Vincenty formulae: return np.rad2deg(np.arctan2( np.sqrt( (np.cos(dec2)*np.sin(ra2-ra1))**2 + (np.cos(dec1)*np.sin(dec2) - np.sin(dec1)*np.cos(dec2)*np.cos(ra2-ra1))**2 ), np.sin(dec1)*np.sin(dec2) + np.cos(dec1)*np.cos(dec2)*np.cos(ra2-ra1) ))
#--------------------------------------------------------------------------------------------------
[docs] def radec_to_cartesian(radec): """ Convert spherical coordinates as (ra, dec) pairs to cartesian coordinates (x,y,z). Parameters: radec (ndarray): Array with ra-dec pairs in degrees. Returns: ndarray: (x,y,z) coordinates corresponding to input coordinates. """ radec = np.atleast_2d(radec) xyz = np.empty((radec.shape[0], 3), dtype='float64') phi = np.radians(radec[:,0]) theta = np.pi/2 - np.radians(radec[:,1]) xyz[:,0] = np.sin(theta) * np.cos(phi) xyz[:,1] = np.sin(theta) * np.sin(phi) xyz[:,2] = np.cos(theta) return xyz
#--------------------------------------------------------------------------------------------------
[docs] def cartesian_to_radec(xyz): """ Convert cartesian coordinates (x,y,z) to spherical coordinates in ra-dec form. Parameters: radec (ndarray): Array with ra-dec pairs. Returns: ndarray: ra-dec coordinates in degrees corresponding to input coordinates. """ xyz = np.atleast_2d(xyz) radec = np.empty((xyz.shape[0], 2), dtype='float64') radec[:,1] = np.pi/2 - np.arccos(xyz[:,2]) radec[:,0] = np.arctan2(xyz[:,1], xyz[:,0]) indx = radec[:,0] < 0 radec[indx,0] = 2*np.pi - np.abs(radec[indx,0]) indx = radec[:,0] > 2*np.pi radec[indx,0] -= 2*np.pi return np.degrees(radec)
#--------------------------------------------------------------------------------------------------
[docs] def rms_timescale(time, flux, timescale=3600/86400): """ Compute robust RMS on specified timescale. Using MAD scaled to RMS. Parameters: time (ndarray): Timestamps in days. flux (ndarray): Flux to calculate RMS for. timescale (float, optional): Timescale to bin timeseries before calculating RMS. Default=1 hour. Returns: float: Robust RMS on specified timescale. .. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk> """ time = np.asarray(time) flux = np.asarray(flux) if len(flux) == 0 or allnan(flux): return np.nan if len(time) == 0 or allnan(time): raise ValueError("Invalid time-vector specified. No valid timestamps.") time_min = np.nanmin(time) time_max = np.nanmax(time) if not np.isfinite(time_min) or not np.isfinite(time_max) or time_max - time_min <= 0: raise ValueError("Invalid time-vector specified") # Construct the bin edges seperated by the timescale: bins = np.arange(time_min, time_max, timescale) bins = np.append(bins, time_max) # Bin the timeseries to one hour: indx = np.isfinite(flux) flux_bin, _, _ = binned_statistic(time[indx], flux[indx], nanmean, bins=bins) # Compute robust RMS value (MAD scaled to RMS) return mad_to_sigma * nanmedian(np.abs(flux_bin - nanmedian(flux_bin)))
#--------------------------------------------------------------------------------------------------
[docs] def find_nearest(array, value): """ Search array for value and return the index where the value is closest. Parameters: array (ndarray): Array to search. value: Value to search array for. Returns: int: Index of ``array`` closest to ``value``. Raises: ValueError: If ``value`` is NaN. .. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk> """ if np.isnan(value): raise ValueError("Invalid search value") if np.isposinf(value): return nanargmax(array) if np.isneginf(value): return nanargmin(array) return nanargmin(np.abs(array - value))
#idx = np.searchsorted(array, value, side='left') #if idx > 0 and (idx == len(array) or abs(value - array[idx-1]) <= abs(value - array[idx])): # return idx-1 #else: # return idx #--------------------------------------------------------------------------------------------------
[docs] def download_file(url, destination, desc=None, timeout=60, position_holders=None, position_lock=None, showprogress=None): """ Download file from URL and place into specified destination. Parameters: url (str): URL to file to be downloaded. destination (str): Path where to save file. desc (str, optional): Description to write next to progress-bar. timeout (float): Time to wait for server response in seconds. Default=60. showprogress (bool): Force showing the progress bar. If ``None``, the progressbar is shown based on the logging level and output type. .. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk> """ logger = logging.getLogger(__name__) tqdm_settings = { 'unit': 'B', 'unit_scale': True, 'position': None, 'leave': True, 'disable': None if logger.isEnabledFor(logging.INFO) else True, 'desc': desc } if showprogress is not None: tqdm_settings['disable'] = not showprogress if position_holders is not None: tqdm_settings['leave'] = False position_lock.acquire() tqdm_settings['position'] = position_holders.index(False) position_holders[tqdm_settings['position']] = True position_lock.release() # Strategy for retrying failing requests several times # with a small increasing sleep in between: retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[413, 429, 500, 502, 503, 504], allowed_methods=['HEAD', 'GET'], ) adapter = HTTPAdapter(max_retries=retry_strategy) try: with requests.Session() as http: http.mount("https://", adapter) http.mount("http://", adapter) # Start stream from URL and throw an error for bad status codes: response = http.get(url, stream=True, allow_redirects=True, timeout=timeout) response.raise_for_status() total_size = response.headers.get('content-length', None) if total_size is not None: total_size = int(total_size) block_size = 1024 with open(destination, 'wb') as handle: with tqdm.tqdm(total=total_size, **tqdm_settings) as pbar: for block in response.iter_content(block_size): handle.write(block) pbar.update(len(block)) if total_size is not None and os.path.getsize(destination) != total_size: raise RuntimeError("File not downloaded correctly") except: # noqa: E722, pragma: no cover logger.exception("Could not download file") with contextlib.suppress(FileNotFoundError): os.remove(destination) raise finally: # Pause before returning to give progress bar time to write. if position_holders is not None: position_lock.acquire() position_holders[tqdm_settings['position']] = False position_lock.release()
#--------------------------------------------------------------------------------------------------
[docs] def download_parallel(urls, workers=4, timeout=60, showprogress=None): """ Download several files in parallel using multiple threads. Parameters: urls (iterable): List of files to download. Each element should consist of a list or tuple, containing two elements: The URL to download, and the path to the destination where the file should be saved. workers (int, optional): Number of threads to use for downloading. Default=4. timeout (float): Time to wait for server response in seconds. Default=60. .. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk> """ # Don't overcomplicate things for a singe file: if len(urls) == 1: download_file(urls[0][0], urls[0][1], timeout=timeout, showprogress=showprogress) return workers = min(workers, len(urls)) position_holders = [False] * workers plock = Lock() def _wrapper(arg): download_file(arg[0], arg[1], timeout=timeout, showprogress=showprogress, position_holders=position_holders, position_lock=plock) errors = [] with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(_wrapper, url): url for url in urls} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: future.result() except: # noqa: E722, pragma: no cover errors.append(url[0]) if errors: raise RuntimeError("Errors encountered during download of the following URLs:\n%s" % '\n'.join(errors))
#--------------------------------------------------------------------------------------------------
[docs] class TqdmLoggingHandler(logging.Handler):
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] def emit(self, record): try: msg = self.format(record) tqdm.tqdm.write(msg) self.flush() except (KeyboardInterrupt, SystemExit): # pragma: no cover raise except: # noqa: E722, pragma: no cover self.handleError(record)
#--------------------------------------------------------------------------------------------------
[docs] class ListHandler(logging.Handler): """ A logging.Handler that writes messages into a list object. The standard logging.QueueHandler/logging.QueueListener can not be used for this because the QueueListener runs in a private thread, not the main thread. .. warning:: This handler is not thread-safe. Do not use it in threaded environments. """
[docs] def __init__(self, *args, message_queue, **kwargs): """Initialize by copying the queue and sending everything else to superclass.""" super().__init__(*args, **kwargs) self.message_queue = message_queue
[docs] def emit(self, record): """Add the formatted log message (sans newlines) to the queue.""" self.message_queue.append(self.format(record).rstrip('\n'))
#--------------------------------------------------------------------------------------------------
[docs] class LoggerWriter(object): """ File-like object which passes input into a logger. Can be used together with :py:func:`contextlib.redirect_stdout` or :py:func:`contextlib.redirect_stderr` to redirect streams to the given logger. Can be useful for wrapping codes which uses normal :py:func:`print` functions for logging. .. code-block:: python :linenos: logger = logging.getLogger(__name__) with contextlib.redirect_stdout(LoggerWriter(logger, logging.INFO)): print("This goes into the logger instead of STDOUT") .. warning:: This object is not thread-safe. Do not use it in threaded environments. .. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk> """
[docs] def __init__(self, logger, level=logging.INFO): self.logger = logger self.level = level
[docs] def write(self, message): if message.strip() != '': self.logger.log(self.level, message)
[docs] def flush(self): pass