#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
A TaskManager which keeps track of which targets to process.
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""
import numpy as np
import os
import sqlite3
import logging
import json
import contextlib
import tempfile
from functools import lru_cache
from numpy import atleast_1d
from . import STATUS
#--------------------------------------------------------------------------------------------------
[docs]
@lru_cache(maxsize=10)
def build_constraints(priority=None, starid=None, sector=None, cadence=None,
camera=None, ccd=None, cbv_area=None, datasource=None, tmag_min=None, tmag_max=None,
return_list=False):
"""
Build constraints for database query from given parameters.
For ``tmag_min`` and ``tmag_max`` constraints, these limits are put on the primary target
for all secondary targets. This means that a faint target will still be processed if it is
in the TPF of a bright target. This is because this constraint is primarily used for
processing bright targets separately since these require more memory.
Parameters:
priority (int, optional): Only return task matching this priority.
starid (int, optional): Only return tasks matching this starid.
sector (int, optional): Only return tasks matching this Sector.
cadence (int, optional): Only return tasks matching this cadence.
camera (int, optional): Only return tasks matching this camera.
ccd (int, optional): Only return tasks matching this CCD.
cbv_area (int, optional): Only return tasks matching this CBV-AREA.
datasource (str, optional): Only return tasks from this datasource.
Choises are ``'tpf'`` and ``'ffi'``.
tmag_min (float, optional): Lower/bright limit on Tmag.
tmag_max (float, optional): Upper/faint limit on Tmag.
Returns:
list: List of strings containing constraints for database. The constraints should be
joined with "AND" to have the desired effect.
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""
constraints = []
if priority is not None:
constraints.append('todolist.priority IN (' + ','.join([str(int(c)) for c in atleast_1d(priority)]) + ')')
if starid is not None:
constraints.append('todolist.starid IN (' + ','.join([str(int(c)) for c in atleast_1d(starid)]) + ')')
if sector is not None:
constraints.append('todolist.sector IN (' + ','.join([str(int(c)) for c in atleast_1d(sector)]) + ')')
if cadence == 'ffi':
constraints.append("todolist.datasource='ffi'")
elif cadence is not None:
constraints.append('todolist.cadence IN (' + ','.join([str(int(c)) for c in atleast_1d(cadence)]) + ')')
if camera is not None:
constraints.append('todolist.camera IN (' + ','.join([str(int(c)) for c in atleast_1d(camera)]) + ')')
if ccd is not None:
constraints.append('todolist.ccd IN (' + ','.join([str(int(c)) for c in atleast_1d(ccd)]) + ')')
if cbv_area is not None:
constraints.append('todolist.cbv_area IN (' + ','.join([str(int(c)) for c in atleast_1d(cbv_area)]) + ')')
if tmag_min is not None or tmag_max is not None:
# To avoid having three separate cases, we join all cases by
# putting in dummy upper and lower bounds in case they are
# not provided. The values should be outside the range on any normal stars:
tmag_min = -99 if tmag_min is None else tmag_min
tmag_max = 99 if tmag_max is None else tmag_max
constraints.append(f"((todolist.datasource NOT LIKE 'tpf:%' AND todolist.tmag BETWEEN {tmag_min:f} AND {tmag_max:f}) OR (todolist.datasource LIKE 'tpf:%' AND CAST(SUBSTR(todolist.datasource,5) AS INTEGER) IN (SELECT DISTINCT starid FROM todolist t2 WHERE t2.datasource='tpf' AND t2.tmag BETWEEN {tmag_min:f} AND {tmag_max:f})))")
if datasource is not None:
constraints.append("todolist.datasource='ffi'" if datasource == 'ffi' else "todolist.datasource!='ffi'")
# If asked for it, return the list if constraints otherwise return string
# which fits into the other queries done by the TaskManager:
if return_list:
return constraints
return ' AND ' + ' AND '.join(constraints) if constraints else ''
#--------------------------------------------------------------------------------------------------
[docs]
class TaskManager(object):
"""
A TaskManager which keeps track of which targets to process.
"""
[docs]
def __init__(self, todo_file, cleanup=False, overwrite=False, cleanup_constraints=None,
summary=None, summary_interval=200, load_into_memory=False, backup_interval=10000):
"""
Initialize the TaskManager which keeps track of which targets to process.
Parameters:
todo_file (string): Path to the TODO-file.
cleanup (boolean, optional): Perform cleanup/optimization of TODO-file before
during initialization. Default=False.
overwrite (boolean, optional): Restart calculation from the beginning, discarding
any previous results. Default=False.
cleanup_constraints (dict, optional): Dict of constraint for cleanup of the status of
previous correction runs. If not specified, all bad results are cleaned up.
summary (string, optional): Path to file where to periodically write a progress summary.
The output file will be in JSON format. Default=None.
summary_interval (int, optional): Interval at which summary file is updated.
Setting this to 1 will mean writing the file after every tasks completes.
Default=100.
Raises:
FileNotFoundError: If TODO-file could not be found.
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""
if cleanup_constraints is not None and not isinstance(cleanup_constraints, (dict, list)):
raise ValueError("cleanup_constraints should be dict or list")
if backup_interval is not None and int(backup_interval) <= 0:
raise ValueError("Invalid backup_interval")
if os.path.isdir(todo_file):
todo_file = os.path.join(todo_file, 'todo.sqlite')
if not os.path.exists(todo_file):
raise FileNotFoundError('Could not find TODO-file')
self.todo_file = os.path.abspath(todo_file)
self.overwrite = overwrite
self.summary_file = summary
self.summary_interval = None if summary_interval is None else int(summary_interval)
self.load_into_memory = load_into_memory
self.backup_interval = None if backup_interval is None else int(backup_interval)
self.summary_counter = 0
self._results_saved_counter = 0
# Setup logging:
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console = logging.StreamHandler()
console.setFormatter(formatter)
self.logger = logging.getLogger(__name__)
if not self.logger.hasHandlers():
self.logger.addHandler(console)
self.logger.setLevel(logging.INFO)
# Load the SQLite file (possibly into memory):
if self.load_into_memory:
self.logger.debug('Creating in-memory copy of database...')
self.conn = sqlite3.connect(':memory:')
journal_mode = 'MEMORY'
syncronous = 'OFF'
with contextlib.closing(sqlite3.connect('file:' + self.todo_file + '?mode=ro', uri=True)) as source:
source.backup(self.conn)
else:
self.conn = sqlite3.connect(self.todo_file)
journal_mode = 'TRUNCATE'
syncronous = 'NORMAL'
self.conn.row_factory = sqlite3.Row
self.cursor = self.conn.cursor()
self.cursor.execute("PRAGMA foreign_keys=ON;")
self.cursor.execute("PRAGMA locking_mode=EXCLUSIVE;")
self.cursor.execute(f"PRAGMA journal_mode={journal_mode:s};")
self.cursor.execute(f"PRAGMA synchronous={syncronous:s};")
self.cursor.execute("PRAGMA temp_store=MEMORY;")
self.conn.commit()
# Reset the status of everything for a new run:
if overwrite:
self.cursor.execute("UPDATE todolist SET status=NULL;")
self.cursor.execute("DROP TABLE IF EXISTS diagnostics;")
self.cursor.execute("DROP TABLE IF EXISTS photometry_skipped;")
self.conn.commit()
cleanup = True # Enforce a cleanup after deleting old results
# Create table for diagnostics:
self.cursor.execute("""CREATE TABLE IF NOT EXISTS diagnostics (
priority INTEGER PRIMARY KEY ASC NOT NULL,
lightcurve TEXT,
method_used TEXT NOT NULL,
elaptime REAL NOT NULL,
worker_wait_time REAL,
mean_flux DOUBLE PRECISION,
variance DOUBLE PRECISION,
variability DOUBLE PRECISION,
rms_hour DOUBLE PRECISION,
ptp DOUBLE PRECISION,
pos_row REAL,
pos_column REAL,
contamination REAL,
mask_size INTEGER,
edge_flux REAL,
stamp_width INTEGER,
stamp_height INTEGER,
stamp_resizes INTEGER,
errors TEXT,
FOREIGN KEY (priority) REFERENCES todolist(priority) ON DELETE CASCADE ON UPDATE CASCADE
);""")
self.cursor.execute("""CREATE TABLE IF NOT EXISTS photometry_skipped (
priority INTEGER NOT NULL,
skipped_by INTEGER NOT NULL,
FOREIGN KEY (priority) REFERENCES todolist(priority) ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (skipped_by) REFERENCES todolist(priority) ON DELETE RESTRICT ON UPDATE CASCADE
);""")
self.cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS diagnostics_lightcurve_idx ON diagnostics (lightcurve);")
self.cursor.execute("CREATE INDEX IF NOT EXISTS todolist_datasource_idx ON todolist (datasource);")
self.conn.commit()
# This is only for backwards compatibility.
self.cursor.execute("PRAGMA table_info(todolist);")
existing_columns = [r['name'] for r in self.cursor.fetchall()]
if 'cadence' not in existing_columns:
self.logger.debug("Adding CADENCE column to todolist")
self.cursor.execute("BEGIN TRANSACTION;")
self.cursor.execute("ALTER TABLE todolist ADD COLUMN cadence INTEGER DEFAULT NULL;")
self.cursor.execute("UPDATE todolist SET cadence=1800 WHERE datasource='ffi' AND sector < 27;")
self.cursor.execute("UPDATE todolist SET cadence=600 WHERE datasource='ffi' AND sector >= 27 AND sector <= 55;")
self.cursor.execute("UPDATE todolist SET cadence=120 WHERE datasource!='ffi' AND sector < 27;")
self.cursor.execute("SELECT COUNT(*) AS antal FROM todolist WHERE cadence IS NULL;")
if self.cursor.fetchone()['antal'] > 0:
self.close()
raise ValueError("TODO-file does not contain CADENCE information and it could not be determined automatically. Please recreate TODO-file.")
self.conn.commit()
# Add status indicator for corrections to todolist, if it doesn't already exists:
# This is only for backwards compatibility.
self.cursor.execute("PRAGMA table_info(diagnostics);")
existing_columns = [r['name'] for r in self.cursor.fetchall()]
if 'edge_flux' not in existing_columns:
self.logger.debug("Adding edge_flux column to diagnostics")
self.cursor.execute("ALTER TABLE diagnostics ADD COLUMN edge_flux REAL DEFAULT NULL")
self.conn.commit()
if 'worker_wait_time' not in existing_columns:
self.logger.debug("Adding worker_wait_time column to diagnostics")
self.cursor.execute("ALTER TABLE diagnostics ADD COLUMN worker_wait_time REAL DEFAULT NULL")
self.conn.commit()
if 'method_used' not in existing_columns:
# Since this one is NOT NULL, we have to do some magic to fill out the
# new column after creation, by finding keywords in other columns.
# This can be a pretty slow process, but it only has to be done once.
self.logger.debug("Adding method_used column to diagnostics")
self.cursor.execute("BEGIN TRANSACTION;")
self.cursor.execute("ALTER TABLE diagnostics ADD COLUMN method_used TEXT NOT NULL DEFAULT 'aperture';")
for m in ('aperture', 'halo', 'psf', 'linpsf'):
self.cursor.execute("UPDATE diagnostics SET method_used=? WHERE priority IN (SELECT priority FROM todolist WHERE method=?);", [m, m])
self.cursor.execute("UPDATE diagnostics SET method_used='halo' WHERE method_used='aperture' AND errors LIKE '%Automatically switched to Halo photometry%';")
self.conn.commit()
if 'starid' in existing_columns:
# Drop this column from the diagnostics table, since the information is already in
# the todolist table.
self.cursor.execute("ALTER TABLE diagnostics DROP COLUMN starid;")
self.conn.commit()
# Reset calculations with status STARTED, ABORT or ERROR:
# We are re-running all with error, in the hope that they will work this time around:
constraints = [f'status IN ({STATUS.STARTED.value:d},{STATUS.ABORT.value:d},{STATUS.ERROR.value:d})']
# Add additional constraints from the user input and build SQL query:
if cleanup_constraints:
if isinstance(cleanup_constraints, dict):
constraints += build_constraints(**cleanup_constraints, return_list=True)
else:
constraints += cleanup_constraints
constraints = ' AND '.join(constraints)
self.cursor.execute("BEGIN TRANSACTION;")
self.cursor.execute("DELETE FROM diagnostics WHERE priority IN (SELECT todolist.priority FROM todolist WHERE " + constraints + ");")
self.cursor.execute("UPDATE todolist SET status=NULL WHERE " + constraints + ";")
self.conn.commit()
# Analyze the tables for better query planning:
self.logger.debug("Analyzing database...")
self.cursor.execute("ANALYZE;")
# Prepare summary object:
self.summary = {
'slurm_jobid': os.environ.get('SLURM_JOB_ID', None),
'numtasks': 0,
'tasks_run': 0,
'last_error': None,
'mean_elaptime': None,
'mean_worker_waittime': None
}
# Make sure to add all the different status to summary:
for s in STATUS:
self.summary[s.name] = 0
# If we are going to output summary, make sure to fill it up:
if self.summary_file:
# Ensure it is an absolute file path:
self.summary_file = os.path.abspath(self.summary_file)
# Extract information from database:
self.cursor.execute("SELECT status,COUNT(*) AS cnt FROM todolist GROUP BY status;")
for row in self.cursor.fetchall():
self.summary['numtasks'] += row['cnt']
if row['status'] is not None:
self.summary[STATUS(row['status']).name] = row['cnt']
# Make sure the containing directory exists:
os.makedirs(os.path.dirname(self.summary_file), exist_ok=True)
# Write summary to file:
self.write_summary()
# Run a cleanup/optimization of the database before we get started:
if cleanup:
self.logger.info("Cleaning TODOLIST before run...")
tmp_isolevel = self.conn.isolation_level
try:
self.conn.isolation_level = None
self.cursor.execute("VACUUM;")
finally:
self.conn.isolation_level = tmp_isolevel
#----------------------------------------------------------------------------------------------
[docs]
def backup(self):
"""
Save backup of todo-file to disk.
This only has an effect when `load_into_memory` is enabled.
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""
self._results_saved_counter = 0
if self.load_into_memory:
backupfile = tempfile.NamedTemporaryFile(
dir=os.path.dirname(self.todo_file),
prefix=os.path.basename(self.todo_file) + '-backup-',
delete=False).name
with contextlib.closing(sqlite3.connect(backupfile)) as dest:
self.conn.backup(dest)
dest.execute("PRAGMA journal_mode=DELETE;")
dest.execute('PRAGMA synchronous=NORMAL;')
dest.commit()
# Since we are running from memory, the original file
# is not opened by any process, so we are free to
# replace it:
try:
os.replace(backupfile, self.todo_file)
except PermissionError: # pragma: no cover
self.logger.exception('Could not overwrite original file. Backup saved as: %s', backupfile)
#----------------------------------------------------------------------------------------------
[docs]
def close(self):
"""Close TaskManager and all associated objects."""
if hasattr(self, 'cursor') and hasattr(self, 'conn'):
try:
self.conn.rollback()
self.cursor.execute("PRAGMA journal_mode=DELETE;")
self.cursor.execute('PRAGMA synchronous=NORMAL;')
self.conn.commit()
self.cursor.close()
self.backup()
except sqlite3.ProgrammingError:
pass
if hasattr(self, 'conn'):
self.conn.close()
self.write_summary()
#----------------------------------------------------------------------------------------------
def __enter__(self):
return self
#----------------------------------------------------------------------------------------------
def __exit__(self, *args):
self.close()
#----------------------------------------------------------------------------------------------
def __del__(self):
self.summary_file = None
self.close()
#----------------------------------------------------------------------------------------------
[docs]
def get_number_tasks(self, **kwargs):
"""
Get number of tasks due to be processed.
Parameters:
**kwarg: Keyword arguments are passed to :func:`build_constraints`.
Returns:
int: Number of tasks due to be processed.
"""
constraints = build_constraints(**kwargs)
self.cursor.execute("SELECT COUNT(*) AS num FROM todolist WHERE status IS NULL" + constraints + ";")
return int(self.cursor.fetchone()['num'])
#----------------------------------------------------------------------------------------------
[docs]
def get_task(self, **kwargs):
"""
Get next task to be processed.
Parameters:
**kwarg: Keyword arguments are passed to :func:`build_constraints`.
Returns:
dict or None: Dictionary of settings for task.
"""
constraints = build_constraints(**kwargs)
self.cursor.execute("SELECT priority,starid,method,sector,camera,ccd,cadence,datasource,tmag FROM todolist WHERE status IS NULL" + constraints + " ORDER BY priority LIMIT 1;")
task = self.cursor.fetchone()
if task:
return dict(task)
return None
#----------------------------------------------------------------------------------------------
[docs]
def get_random_task(self):
"""
Get random task to be processed.
Returns:
dict or None: Dictionary of settings for task.
"""
self.cursor.execute("SELECT priority,starid,method,sector,camera,ccd,cadence,datasource,tmag FROM todolist WHERE status IS NULL ORDER BY RANDOM() LIMIT 1;")
task = self.cursor.fetchone()
if task:
return dict(task)
return None
#----------------------------------------------------------------------------------------------
[docs]
def start_task(self, taskid):
"""
Mark a task as STARTED in the TODO-list.
Parameters:
taskid (int): ID (priority) of the task to be marked as STARTED.
"""
self.cursor.execute(f"UPDATE todolist SET status={STATUS.STARTED.value:d} WHERE priority=?;", [taskid])
self.conn.commit()
self.summary['STARTED'] += 1
#----------------------------------------------------------------------------------------------
[docs]
def save_result(self, result):
"""
Save results and diagnostics. This will update the TODO list.
Parameters:
results (dict): Dictionary of results and diagnostics.
"""
# Extract details dictionary:
details = result.get('details', {})
error_msg = details.get('errors', [])
# The status of this target returned by the photometry:
my_status = result['status']
# Extract stamp width and height:
stamp = details.get('stamp', None)
stamp_width = None if stamp is None else stamp[3] - stamp[2]
stamp_height = None if stamp is None else stamp[1] - stamp[0]
# Make changes to database:
additional_skipped = 0
self.cursor.execute("BEGIN TRANSACTION;")
try:
# Also set status of targets that were marked as "SKIPPED" by this target:
if 'skip_targets' in details and len(details['skip_targets']) > 0:
skip_targets = set(details['skip_targets'])
if result['datasource'].startswith('tpf:') and int(result['datasource'][4:]) in skip_targets:
# This secondary target is in the mask of the primary target.
# We never want to return a lightcurve for a secondary target over
# a primary target, so we are going to mark this one as SKIPPED.
primary_tpf_target_starid = int(result['datasource'][4:])
self.cursor.execute("SELECT priority FROM todolist WHERE starid=? AND datasource='tpf' AND sector=? AND camera=? AND ccd=? AND cadence=?;", (
primary_tpf_target_starid,
result['sector'],
result['camera'],
result['ccd'],
result['cadence']
))
primary_tpf_target_priority = self.cursor.fetchone()
# Mark the current star as SKIPPED and that it was caused by the primary:
self.logger.info("Changing status to SKIPPED for priority %s because it overlaps with primary target TIC %d", result['priority'], primary_tpf_target_starid)
my_status = STATUS.SKIPPED
if primary_tpf_target_priority is not None:
self.cursor.execute("INSERT INTO photometry_skipped (priority,skipped_by) VALUES (?,?);", (
result['priority'],
primary_tpf_target_priority[0]
))
else:
self.logger.warning("Could not find primary TPF target (TIC %d) for priority=%d", primary_tpf_target_starid, result['priority'])
error_msg.append("TargetNotFoundError: Could not find primary TPF target (TIC %d)" % primary_tpf_target_starid)
else:
# Create unique list of starids to be masked as skipped:
skip_starids = ','.join([str(starid) for starid in skip_targets])
# Ask the todolist if there are any stars that are brighter than this
# one among the other targets in the mask:
if result['datasource'] == 'tpf':
skip_datasources = "'tpf','tpf:%d'" % result['starid']
else:
skip_datasources = "'" + result['datasource'] + "'"
self.cursor.execute("SELECT priority,tmag FROM todolist WHERE starid IN (" + skip_starids + ") AND datasource IN (" + skip_datasources + ") AND sector=? AND camera=? AND ccd=? AND cadence=?;", (
result['sector'],
result['camera'],
result['ccd'],
result['cadence']
))
skip_rows = self.cursor.fetchall()
if len(skip_rows) > 0:
skip_tmags = np.array([row['tmag'] for row in skip_rows])
if np.all(result['tmag'] < skip_tmags):
# This target was the brightest star in the mask,
# so let's keep it and simply mark all the other targets
# as SKIPPED:
self.cursor.execute("DELETE FROM photometry_skipped WHERE skipped_by=?;", (result['priority'],))
for row in skip_rows:
self.cursor.execute(f"UPDATE todolist SET status={STATUS.SKIPPED.value:d} WHERE priority=?;", [
row['priority']
])
additional_skipped += self.cursor.rowcount
self.cursor.execute("INSERT INTO photometry_skipped (priority,skipped_by) VALUES (?,?);", (
row['priority'],
result['priority']
))
else:
# This target was not the brightest star in the mask,
# and a brighter target is going to be processed,
# so let's change this one to SKIPPED and let the other
# one run later on
self.logger.info("Changing status to SKIPPED for priority %s", result['priority'])
my_status = STATUS.SKIPPED
# Mark that the brightest star among the skip-list is the reason for
# for skipping this target:
self.cursor.execute("INSERT INTO photometry_skipped (priority,skipped_by) VALUES (?,?);", (
result['priority'],
skip_rows[np.argmin(skip_tmags)]['priority']
))
# Convert error messages from list to string or None:
error_msg = None if not error_msg else '\n'.join(error_msg)
# Update the status in the TODO list:
self.cursor.execute("UPDATE todolist SET status=? WHERE priority=?;", (
my_status.value,
result['priority']
))
self.cursor.execute("INSERT OR REPLACE INTO diagnostics (priority, lightcurve, method_used, elaptime, worker_wait_time, pos_column, pos_row, mean_flux, variance, variability, rms_hour, ptp, mask_size, edge_flux, contamination, stamp_width, stamp_height, stamp_resizes, errors) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);", (
result['priority'],
details.get('filepath_lightcurve', None),
result['method_used'],
result['time'],
result.get('worker_wait_time', None),
details.get('pos_centroid', (None, None))[0],
details.get('pos_centroid', (None, None))[1],
details.get('mean_flux', None),
details.get('variance', None),
details.get('variability', None),
details.get('rms_hour', None),
details.get('ptp', None),
details.get('mask_size', None),
details.get('edge_flux', None),
details.get('contamination', None),
stamp_width,
stamp_height,
details.get('stamp_resizes', 0),
error_msg
))
self.conn.commit()
except: # noqa: E722, pragma: no cover
self.conn.rollback()
raise
# Update the summary dictionary with the status:
self.summary['tasks_run'] += 1
self.summary[my_status.name] += 1
self.summary['STARTED'] -= 1
self.summary['SKIPPED'] += additional_skipped
# Store the last error message in summary:
if error_msg:
self.summary['last_error'] = error_msg
# Calculate mean elapsed time using "streaming weighted mean" with (alpha=0.1):
# https://dev.to/nestedsoftware/exponential-moving-average-on-streaming-data-4hhl
if self.summary['mean_elaptime'] is None:
self.summary['mean_elaptime'] = result['time']
else:
self.summary['mean_elaptime'] += 0.1 * (result['time'] - self.summary['mean_elaptime'])
# All the results should have the same worker_waittime.
# So only update this once, using just that last result in the list:
if result.get('worker_wait_time') is not None:
if self.summary['mean_worker_waittime'] is None:
self.summary['mean_worker_waittime'] = result['worker_wait_time']
else:
self.summary['mean_worker_waittime'] += 0.1 * (result['worker_wait_time'] - self.summary['mean_worker_waittime'])
# Write summary file:
self.summary_counter += 1
if self.summary_file and self.summary_counter >= self.summary_interval:
self.summary_counter = 0
self.write_summary()
# Backup every X results:
self._results_saved_counter += 1
if self.backup_interval is not None and self._results_saved_counter >= self.backup_interval:
self.backup()
#----------------------------------------------------------------------------------------------
[docs]
def write_summary(self):
"""Write summary of progress to file. The summary file will be in JSON format."""
if hasattr(self, 'summary_file') and self.summary_file:
try:
with open(self.summary_file, 'w') as fid:
json.dump(self.summary, fid)
except: # noqa: E722, pragma: no cover
self.logger.exception("Could not write summary file")