From 65b8a89ac5568f6410ae35f5bcb9fd8c0cef78a0 Mon Sep 17 00:00:00 2001 From: Gareth S Cabourn Davies Date: Wed, 24 Jul 2024 15:08:39 +0100 Subject: [PATCH] Add a script to supervise template fitting in pycbc live (#4813) * Move code which will be shared into a module * getting more things ready for the trigger fits / dq supervision * start work on fit_by, fit_over and DQ trigger supervision * some work - bring in latest trigger file finding * updates to use latest trigger file finding * pass cuts through to the collation script * symlink fit_over_multiparam output to variable file * remove unused imports, argument ordering, plot titles * CC * Fix docstring, formatting * Move the trigger collatiojn/fitting superviosor to use configparsert * start moving significance fit supervision to configparser * Start puttinf template fits into supervision code shared with significance fits * Deal properly with whene there are no trigers, so we can't calculate the median_sigma * Remove no-longer-valid efficiency savings * Some minor fixes for when sending mail from supervision * Mve to a single supervision file for both trigger and significance fits * Remove FIXMEs * Use check=True with subprocess.run * Remove function checking fit coefficients - it is now unclear what the 'safe' ranges should be for the various different stages * Fix minor bug * typo * Some minor fixes and TDC comments * CC --- .../pycbc_fit_sngls_by_template | 3 + .../pycbc_fit_sngls_over_multiparam | 10 +- .../pycbc_live_plot_single_significance_fits | 4 +- bin/live/pycbc_live_single_significance_fits | 7 - ...pycbc_live_supervise_collated_trigger_fits | 634 ++++++++++++++++++ ...bc_live_supervise_single_significance_fits | 521 -------------- bin/plotting/pycbc_plot_bank_corner | 4 - pycbc/live/__init__.py | 1 + pycbc/live/supervision.py | 154 +++++ 9 files changed, 802 insertions(+), 536 deletions(-) create mode 100755 bin/live/pycbc_live_supervise_collated_trigger_fits delete mode 100755 bin/live/pycbc_live_supervise_single_significance_fits create mode 100644 pycbc/live/supervision.py diff --git a/bin/all_sky_search/pycbc_fit_sngls_by_template b/bin/all_sky_search/pycbc_fit_sngls_by_template index 83b55ff5676..5e7d31e87e3 100755 --- a/bin/all_sky_search/pycbc_fit_sngls_by_template +++ b/bin/all_sky_search/pycbc_fit_sngls_by_template @@ -352,6 +352,9 @@ sigma_regions = trigf[args.ifo + '/sigmasq_template'][:] median_sigma = [] for reg in sigma_regions: strigs = trigf[args.ifo + '/sigmasq'][reg] + if len(strigs) == 0: + median_sigma.append(np.nan) + continue median_sigma.append(np.median(strigs) ** 0.5) outfile = HFile(args.output, 'w') diff --git a/bin/all_sky_search/pycbc_fit_sngls_over_multiparam b/bin/all_sky_search/pycbc_fit_sngls_over_multiparam index 4230efd71ef..45b46fa32e3 100755 --- a/bin/all_sky_search/pycbc_fit_sngls_over_multiparam +++ b/bin/all_sky_search/pycbc_fit_sngls_over_multiparam @@ -342,14 +342,20 @@ if len(args.template_fit_file) > 1: nasum = nabove[tidsort].cumsum() invsum = invalphan[tidsort].cumsum() ntsum = ntotal[tidsort].cumsum() - mssum = median_sigma[tidsort].cumsum() num = right - left tid = tid_unique nabove = (nasum[right] - nasum[left]) / num invalphan = (invsum[right] - invsum[left]) / num ntotal = (ntsum[right] - ntsum[left]) / num - median_sigma = (mssum[right] - mssum[left]) / num + if median_sigma is not None: + # Median sigma is a special one - we need to make sure that + # we do not mess things up when nan values are given, so we + # can't use the special cumsum fast option + median_sigma = [ + numpy.nanmean(median_sigma[tidsort[l:r]]) + for l, r in zip(left, right) + ] if args.output_fits_by_template: # Store fit_by_template values for output file diff --git a/bin/live/pycbc_live_plot_single_significance_fits b/bin/live/pycbc_live_plot_single_significance_fits index 5010066856a..3dc2414a297 100644 --- a/bin/live/pycbc_live_plot_single_significance_fits +++ b/bin/live/pycbc_live_plot_single_significance_fits @@ -131,10 +131,10 @@ for ifo in all_ifos: continue # Keep track of some maxima for use in setting the plot limits - maxstat = stats[ifo].max() + maxstat = np.nanmax(stats[ifo]) max_rate = 0 - statrange = maxstat - max(stats[ifo].min(), fit_threshold[ifo]) + statrange = maxstat - max(np.nanmin(stats[ifo]), fit_threshold[ifo]) plotmax = maxstat + statrange * 0.05 plotbins = np.linspace(fit_threshold[ifo], plotmax, 400) diff --git a/bin/live/pycbc_live_single_significance_fits b/bin/live/pycbc_live_single_significance_fits index d122d6b2787..b1e76c32612 100644 --- a/bin/live/pycbc_live_single_significance_fits +++ b/bin/live/pycbc_live_single_significance_fits @@ -98,13 +98,6 @@ args.trigger_cuts = args.trigger_cuts or [] args.trigger_cuts.append(f"end_time:{args.gps_start_time}:lower_inc") args.trigger_cuts.append(f"end_time:{args.gps_end_time}:upper_inc") -# Efficiency saving: add SNR cut before any others as sngl_ranking can -# only be less than SNR. -args.trigger_cuts.insert(0, f"snr:{args.fit_threshold}:lower_inc") - -# Cut triggers with sngl-ranking below threshold -args.trigger_cuts.append(f"{args.sngl_ranking}:{args.fit_threshold}:lower_inc") - trigger_cut_dict, template_cut_dict = cuts.ingest_cuts_option_group(args) logging.info("Setting up duration bins") diff --git a/bin/live/pycbc_live_supervise_collated_trigger_fits b/bin/live/pycbc_live_supervise_collated_trigger_fits new file mode 100755 index 00000000000..3206f549c44 --- /dev/null +++ b/bin/live/pycbc_live_supervise_collated_trigger_fits @@ -0,0 +1,634 @@ +#!/usr/bin/env python + +"""Supervise the periodic re-fitting of PyCBC Live single-detector triggers, +and the associated plots. +""" + +import re +import logging +import argparse +from datetime import datetime, timedelta +from dateutil.relativedelta import relativedelta +import os +import shutil +import subprocess +import numpy as np + +from lal import gpstime + +import pycbc +from pycbc.io.hdf import HFile +from pycbc.live import supervision as sv +from pycbc.types.config import InterpolatingConfigParser as icp + +def read_options(args): + """ + read the options into a dictionary + """ + logging.info("Reading config file") + cp = icp(configFiles=[args.config_file]) + config_opts = { + section: {k: v for k, v in cp[section].items()} + for section in cp.sections() + } + del config_opts['environment'] + return config_opts + +def trigger_collation( + day_dt, + day_str, + collation_control_options, + collation_options, + output_dir, + controls + ): + """ + Perform the trigger collation as specified + """ + logging.info("Performing trigger collation") + collate_args = [ + 'pycbc_live_collate_triggers', + ] + collate_args += sv.dict_to_args(collation_options) + gps_start = gpstime.utc_to_gps(day_dt).gpsSeconds + gps_end = gpstime.utc_to_gps(day_dt + timedelta(days=1)).gpsSeconds + + trig_merge_file = os.path.join( + output_dir, + collation_control_options['collated-triggers-format'].format( + ifos=''.join(controls['ifos'].split()), + start=gps_start, + duration=(gps_end - gps_start) + ) + ) + collate_args += [ + '--gps-start-time', f'{gps_start:d}', + '--gps-end-time', f'{gps_end:d}', + '--output-file', trig_merge_file, + ] + + sv.run_and_error(collate_args, controls) + + return trig_merge_file + + +def fit_by_template( + trigger_merge_file, + day_str, + fbt_control_options, + fbt_options, + output_dir, + ifo, + controls + ): + """ + Supervise the running of pycbc_fit_sngls_by_template on live triggers + """ + logging.info("Performing daily fit_by_template") + fbt_out_fname = fbt_control_options['fit-by-template-format'].format( + date=day_str, + ifo=ifo, + ) + fbt_out_full = os.path.join(output_dir, fbt_out_fname) + fit_by_args = ['pycbc_fit_sngls_by_template'] + fit_by_args += ['--trigger-file', trigger_merge_file] + fit_by_args += sv.dict_to_args(fbt_options) + fit_by_args += ['--output', fbt_out_full, '--ifo', ifo] + sv.run_and_error(fit_by_args, controls) + + return fbt_out_full, day_str + + +def find_daily_fit_files( + combined_control_options, + daily_fname_format, + daily_files_dir, + ifo=None + ): + """ + Find files which match the specified formats + """ + log_str = f"Finding files in {daily_files_dir} with format {daily_fname_format}" + if ifo is not None: + log_str += f"in detector {ifo}" + logging.info(log_str) + combined_days = int(combined_control_options['combined-days']) + if 'replay-start-time' in combined_control_options: + replay_start_time = int(combined_control_options['replay-start-time']) + true_start_time = int(combined_control_options['true-start-time']) + replay_duration = int(combined_control_options['replay-duration']) + rep_start_utc = lal.GPSToUTC(replay_start_time)[0:6] + + dt_replay_start = datetime( + year=rep_start_utc[0], + month=rep_start_utc[1], + day=rep_start_utc[2], + hour=rep_start_utc[3], + minute=rep_start_utc[4], + second=rep_start_utc[5] + ) + + td = (day_dt - dt_replay_start).total_seconds() + + # Time since the start of this replay + time_since_replay = np.remainder(td, replay_duration) + + # Add this on to the original start time to get the current time of + # the replay data + true_utc = lal.GPSToUTC(true_start_time)[0:6] + dt_true_start = datetime( + year=true_utc[0], + month=true_utc[1], + day=true_utc[2], + hour=true_utc[3], + minute=true_utc[4], + second=true_utc[5] + ) + # Original time of the data being replayed right now + current_date = dt_true_start + timedelta(seconds=time_since_replay) + else: + current_date = day_dt + + date_test = current_date + timedelta(days=1) + + daily_files = [] + missed_files = 0 + # Maximum consecutive number of days between files before a warning is raised + # 10 days of the detector being off would be unusual for current detectors + max_nmissed = combined_control_options.get('maximum_missed_files', 10) + found_files = 0 + while found_files < combined_days and missed_files < max_nmissed: + # Loop through the possible file locations and see if the file exists + date_test -= timedelta(days=1) + date_out = date_test.strftime("%Y_%m_%d") + daily_fname = daily_fname_format.format( + date=date_out, + ifo=ifo, + ) + + output_dir = os.path.join( + daily_files_dir, + date_out + ) + daily_full = os.path.join( + output_dir, + daily_fname + ) + # Check that the file exists: + if not os.path.exists(daily_full): + missed_files += 1 + logging.info("File %s does not exist - skipping", daily_full) + continue + if not len(daily_files): + end_date = date_out + # This is now the oldest file + first_date = date_out + # reset the "missed files" counter, and add to the "found files" + missed_files = 0 + found_files += 1 + daily_files.append(daily_full) + + if found_files == 0: + raise RuntimeError("No files found") + + if missed_files == max_nmissed: + # If more than a set maximum days between files, something + # is wrong with the analysis. Warn about this and use fewer + # files + logging.warning( + f'More than {max_nmissed} days between files, only using ' + f'{found_files} files!' + ) + + return daily_files, first_date, end_date + + +def fit_over_multiparam( + fit_over_controls, + fit_over_options, + ifo, + day_str, + output_dir, + controls + ): + """ + Supervise the smoothing of live trigger fits using + pycbc_fit_sngls_over_multiparam + """ + daily_files, first_date, end_date = find_daily_fit_files( + fit_over_controls, + fit_over_controls['fit-by-format'], + controls['output-directory'], + ifo=ifo + ) + logging.info( + "Smoothing fits using fit_over_multiparam with %d files and " + "specified parameters", + len(daily_files) + ) + file_id_str = f'{first_date}-{end_date}' + out_fname = fit_over_controls['fit-over-format'].format( + dates=file_id_str, + ifo=ifo, + ) + + fit_over_args = ['pycbc_fit_sngls_over_multiparam', '--template-fit-file'] + fit_over_args += daily_files + fit_over_args += sv.dict_to_args(fit_over_options) + fit_over_full = os.path.join(output_dir, out_fname) + fit_over_args += ['--output', fit_over_full] + sv.run_and_error(fit_over_args, controls) + if 'variable-fit-over-param' in fit_over_controls: + variable_fits = fit_over_controls['variable-fit-over-param'].format( + ifo=ifo + ) + sv.symlink(fit_over_full, variable_fits) + + return fit_over_full, file_id_str + +def plot_fits( + fits_file, + ifo, + day_title_str, + plot_fit_options, + controls, + smoothed=False + ): + """Plotting for fit_by files, and linking to the public directory""" + fits_plot_output = fits_file[:-3] + 'png' + logging.info( + "Plotting template fits %s to %s", + fits_file, + fits_plot_output + ) + fits_plot_arguments = [ + 'pycbc_plot_bank_corner', + '--fits-file', + fits_file, + '--output-plot-file', + fits_plot_output, + ] + fits_plot_arguments += sv.dict_to_args(plot_fit_options) + + title = "Fit parameters for pycbc-live, triggers from " + day_title_str + if smoothed == True: + title += ', smoothed' + fits_plot_arguments += ['--title', title] + sv.run_and_error(fits_plot_arguments, controls) + if 'public-dir' in controls: + public_dir = os.path.abspath(os.path.join( + controls['public-dir'], + *day_str.split('_') + )) + sv.symlink(fits_plot_output, public_dir) + + +def single_significance_fits( + daily_controls, + daily_options, + output_dir, + day_str, + day_dt, + controls, + test_options, + stat_files=None, + ): + """ + Supervise the significance fits for live triggers using + pycbc_live_single_significance_fits + """ + daily_options['output'] = os.path.join( + output_dir, + daily_controls['sig-daily-format'].format(date=day_str), + ) + daily_args = ['pycbc_live_single_significance_fits'] + + gps_start_time = gpstime.utc_to_gps(day_dt).gpsSeconds + gps_end_time = gpstime.utc_to_gps(day_dt + timedelta(days=1)).gpsSeconds + daily_options['gps-start-time'] = f'{gps_start_time:d}' + daily_options['gps-end-time'] = f'{gps_end_time:d}' + daily_args += sv.dict_to_args(daily_options) + if stat_files is not None: + daily_args += ['--statistic-files'] + stat_files + + sv.run_and_error(daily_args, controls) + + return daily_options['output'] + +def plot_single_significance_fits(daily_output, daily_plot_options, controls): + """ + Plotting daily significance fits, and link to public directory if wanted + """ + daily_plot_output = f'{daily_output[:-4]}_{{ifo}}.png' + logging.info( + "Plotting daily significance fits from %s to %s", + daily_output, + daily_plot_output + ) + daily_plot_arguments = [ + 'pycbc_live_plot_single_significance_fits', + '--trigger-fits-file', + daily_output, + '--output-plot-name-format', + daily_plot_output, + ] + daily_plot_arguments += sv.dict_to_args(daily_plot_options) + sv.run_and_error(daily_plot_arguments, controls) + + # Link the plots to the public-dir if wanted + if 'public-dir' in controls: + daily_plot_outputs = [ + daily_plot_output.format(ifo=ifo) + for ifo in controls['ifos'].split() + ] + logging.info("Linking daily fits plots") + for dpo in daily_plot_outputs: + public_dir = os.path.abspath(os.path.join( + controls['public-dir'], + *day_str.split('_') + )) + + sv.symlink(dpo, public_dir) + + +def combine_significance_fits( + combined_options, + combined_controls, + output_dir, + day_str, + controls + ): + """ + Supervise the smoothing of live trigger significance fits using + pycbc_live_combine_single_significance_fits + """ + daily_files, first_date, end_date = find_daily_fit_files( + combined_controls, + combined_controls['daily-format'], + controls['output-directory'], + ) + logging.info( + "Smoothing significance fits over %d files", + len(daily_files) + ) + date_range = f'{first_date}-{end_date}' + outfile_name = combined_controls['outfile-format'].format( + date=day_str, + date_range=date_range, + ) + combined_options['output'] = os.path.join(output_dir, outfile_name) + combined_options['trfits-files'] = ' '.join(daily_files) + + combined_args = ['pycbc_live_combine_single_significance_fits'] + combined_args += sv.dict_to_args(combined_options) + + sv.run_and_error(combined_args, controls) + + if 'variable-significance-fits' in combined_controls: + logging.info("Linking to variable significance fits file") + sv.symlink( + combined_options['output'], + combined_controls['variable-significance-fits'] + ) + + return combined_options['output'], date_range + +def plot_combined_significance_fits( + csf_file, + date_range, + output_dir, + combined_plot_options, + combined_plot_control_options, + controls + ): + """ + Plotting combined significance fits, and link to public directory if wanted + """ + + oput_fmt = combined_plot_control_options['output-plot-name-format'] + if not '{date_range}' in oput_fmt: + raise RuntimeError( + "Must specify {date_range} in output-plot-name-format" + ) + oput_fmt = oput_fmt.replace('{date_range}', date_range) + oput_full = os.path.join(output_dir, oput_fmt) + combined_plot_arguments = [ + 'pycbc_live_plot_combined_single_significance_fits', + '--combined-fits-file', + csf_file, + '--output-plot-name-format', + oput_full + ] + combined_plot_arguments += sv.dict_to_args(combined_plot_options) + + sv.run_and_error(combined_plot_arguments, controls) + + # Get the list of combined plotting output files: + combined_plot_outputs = [ + oput_full.format(ifo=ifo, type='fit_coeffs') for ifo in + controls['ifos'].split() + ] + combined_plot_outputs += [ + oput_full.format(ifo=ifo, type='counts') for ifo in + controls['ifos'].split() + ] + + if 'public-dir' in controls: + logging.info("Linking combined fits to public dir") + public_dir = os.path.abspath(os.path.join( + controls['public-dir'], + *day_str.split('_') + )) + for cpo in combined_plot_outputs: + sv.symlink(cpo, public_dir) + +def supervise_collation_fits_dq(args, day_dt, day_str): + """ + Perform the trigger collation and fits etc. as specified + """ + # Read in the config file and pack into appropriate dictionaries + config_opts = read_options(args) + controls = config_opts['control'] + collation_options = config_opts['collation'] + collation_control_options = config_opts['collation_control'] + fit_by_template_options = config_opts['fit_by_template'] + fit_by_template_control_options = config_opts['fit_by_template_control'] + fit_over_options = config_opts['fit_over_multiparam'] + fit_over_control_options = config_opts['fit_over_multiparam_control'] + plot_fit_options = config_opts['plot_fit'] + daily_options = config_opts['significance_daily_fits'] + daily_control_options = config_opts['significance_daily_fits_control'] + daily_plot_options = config_opts['plot_significance_daily'] + combined_options = config_opts['significance_combined_fits'] + combined_control_options = config_opts['significance_combined_fits_control'] + combined_plot_options = config_opts['plot_significance_combined'] + combined_plot_control_options = config_opts['plot_significance_combined_control'] + test_options = config_opts['test'] + + # The main output directory will have a date subdirectory which we + # put the output into + sv.ensure_directories(controls, day_str) + + ifos = controls['ifos'].split() + output_dir = os.path.join( + controls['output-directory'], + day_str + ) + logging.info("Outputs to %s", output_dir) + if 'public_dir' in controls: + public_dir = os.path.abspath(os.path.join( + controls['public-dir'], + *day_str.split('_') + )) + logging.info("Outputs to be linked to % ", public_dir) + + merged_triggers = trigger_collation( + day_dt, + day_str, + collation_control_options, + collation_options, + output_dir, + controls + ) + # Store the locations of files needed for the statistic + stat_files = [] + for ifo in config_opts['control']['ifos'].split(): + if args.fit_by_template: + fbt_file, date_str = fit_by_template( + merged_triggers, + day_str, + fit_by_template_control_options, + fit_by_template_options, + output_dir, + ifo, + controls, + ) + plot_fits( + fbt_file, + ifo, + date_str, + plot_fit_options, + controls + ) + + if args.fit_over_multiparam: + fom_file, date_str = fit_over_multiparam( + fit_over_control_options, + fit_over_options, + ifo, + day_str, + output_dir, + controls + ) + stat_files.append(fom_file) + plot_fits( + fom_file, + ifo, + date_str, + plot_fit_options, + controls, + smoothed=True, + ) + + if args.single_significance_fits: + ssf_file = single_significance_fits( + daily_control_options, + daily_options, + output_dir, + day_str, + day_dt, + controls, + test_options, + stat_files=stat_files, + ) + plot_single_significance_fits( + ssf_file, + daily_plot_options, + controls + ) + if args.combine_significance_fits: + csf_file, date_str = combine_significance_fits( + combined_options, + combined_control_options, + output_dir, + date_str, + controls + ) + plot_combined_significance_fits( + csf_file, + date_str, + output_dir, + combined_plot_options, + combined_plot_control_options, + controls + ) + + +def get_yesterday_date(): + """ Get the date string for yesterday's triggers """ + day_dt = datetime.utcnow() - timedelta(days=1) + day_dt = datetime.combine(day_dt, datetime.min.time()) + day_str = day_dt.strftime('%Y_%m_%d') + return date_dt, date_str + +parser = argparse.ArgumentParser(description=__doc__) +pycbc.add_common_pycbc_options(parser) +parser.add_argument( + '--config-file', + required=True +) +parser.add_argument( + '--date', + help='Date to analyse, if not given, will analyse yesterday (UTC). ' + 'Format YYYY_MM_DD. Do not use if using --run-daily-at.' +) +parser.add_argument( + '--fit-by-template', + action='store_true', + help="Perform template fits calculation." +) +parser.add_argument( + '--fit-over-multiparam', + action='store_true', + help="Perform template fits smoothing." +) +parser.add_argument( + '--single-significance-fits', + action='store_true', + help="Perform daily singles significance fits." +) +parser.add_argument( + '--combine-significance-fits', + action='store_true', + help="Do combination of singles significance fits." +) +parser.add_argument( + '--run-daily-at', + metavar='HH:MM:SS', + help='Stay running and repeat the fitting daily at the given UTC hour.' +) +args = parser.parse_args() + +pycbc.init_logging(args.verbose, default_level=1) + +if args.run_daily_at is not None and args.date is not None: + parser.error('Cannot take --run-daily-at and --date at the same time') + +if args.run_daily_at is not None: + # keep running and repeat the fitting every day at the given hour + if not re.match('[0-9][0-9]:[0-9][0-9]:[0-9][0-9]', args.run_daily_at): + parser.error('--run-daily-at takes a UTC time in the format HH:MM:SS') + logging.info('Starting in daily run mode') + while True: + sv.wait_for_utc_time(args.run_daily_at) + day_dt, day_str = get_yesterday_date() + logging.info('==== Time to update the single fits, waking up ====') + supervise_collation_fits_dq(args, day_dt, day_str) +else: + # run just once + if args.date: + day_str = args.date + day_dt = datetime.strptime(args.date, '%Y_%m_%d') + else: + day_dt, day_str = get_yesterday_date() + supervise_collation_fits_dq(args, day_dt, day_str) diff --git a/bin/live/pycbc_live_supervise_single_significance_fits b/bin/live/pycbc_live_supervise_single_significance_fits deleted file mode 100755 index 0c6561e9cea..00000000000 --- a/bin/live/pycbc_live_supervise_single_significance_fits +++ /dev/null @@ -1,521 +0,0 @@ -#!/usr/bin/env python - -"""Supervise the periodic re-fitting of PyCBC Live single-detector triggers, -and the associated plots. -""" - -import re -import logging -import argparse -from datetime import datetime, timedelta -from dateutil.relativedelta import relativedelta -import time -import copy -import os -import shutil -import subprocess -import numpy as np - -from lal import gpstime - -import pycbc -from pycbc.io.hdf import HFile - - -def symlink(target, link_name): - """Create a symbolic link replacing the destination and checking for - errors. - """ - cp = subprocess.run([ - 'ln', '-sf', target, link_name - ]) - if cp.returncode: - raise subprocess.SubprocessError( - f"Could not link plot {target} to {link_name}" - ) - - -def dict_to_args(opts_dict): - """ - Convert an option dictionary into a list to be used by subprocess.run - """ - dargs = [] - for option in opts_dict.keys(): - dargs.append('--' + option.strip()) - value = opts_dict[option] - if len(value.split()) > 1: - # value is a list, append individually - for v in value.split(): - dargs.append(v) - elif not value: - # option is a flag, do nothing - continue - else: - # Single value option - easy enough - dargs.append(value) - return dargs - - -def mail_volunteers_error(controls, mail_body_lines, subject): - """ - Email a list of people, defined by mail-volunteers-file - To be used for errors or unusual occurences - """ - if 'mail_volunteers_file' not in controls: - logging.info( - "No file containing people to email, logging the error instead" - ) - for line in mail_body_lines: - logging.warning(line) - return - with open(controls['mail_volunteers_file'], 'r') as mail_volunteers_file: - volunteers = [volunteer.strip() for volunteer in - mail_volunteers_file.readlines()] - logging.info("Emailing %s with warnings", ' '.join(volunteers)) - mail_command = [ - 'mail', - '-s', - subject - ] - mail_command += volunteers - mail_body = '\n'.join(mail_body_lines) - subprocess.run(mail_command, input=mail_body, text=True) - - -def check_trigger_files(filenames, test_options, controls): - """ - Check that the fit coefficients meet criteria set - """ - coeff_upper_limit = float(test_options['upper-limit-coefficient']) - coeff_lower_limit = float(test_options['lower-limit-coefficient']) - warnings = [] - warning_files = [] - for filename in filenames: - warnings_thisfile = [] - with HFile(filename, 'r') as trff: - ifos = [k for k in trff.keys() if not k.startswith('bins')] - fit_coeffs = {ifo: trff[ifo]['fit_coeff'][:] for ifo in ifos} - bins_upper = trff['bins_upper'][:] - bins_lower = trff['bins_lower'][:] - # Which bins have at least *some* triggers within the limit - use_bins = bins_lower > float(test_options['duration-bin-lower-limit']) - for ifo in ifos: - coeffs_above = fit_coeffs[ifo][use_bins] > coeff_upper_limit - coeffs_below = fit_coeffs[ifo][use_bins] < coeff_lower_limit - if not any(coeffs_above) and not any(coeffs_below): - continue - # Problem - the fit coefficient is outside the limits - for bl, bu, fc in zip(bins_lower[use_bins], bins_upper[use_bins], - fit_coeffs[ifo][use_bins]): - if fc < coeff_lower_limit or fc > coeff_upper_limit: - warnings_thisfile.append( - f"WARNING - {ifo} fit coefficient {fc:.3f} in bin " - f"{bl}-{bu} outwith limits " - f"{coeff_lower_limit}-{coeff_upper_limit}" - ) - if warnings_thisfile: - warning_files.append(filename) - warnings.append(warnings_thisfile) - - if warnings: - # Some coefficients are outside the range - # Add the fact that this check failed in the logs - logging.warning("Extreme daily fits values found:") - mail_body_lines = ["Extreme daily fits values found:"] - for filename, filewarnings in zip(warning_files, warnings): - logging.warning(filename) - mail_body_lines.append(f"Values in {filename}") - for fw in filewarnings: - logging.warning(" " + fw) - mail_body_lines.append(" " + fw) - mail_volunteers_error( - controls, - mail_body_lines, - 'PyCBC Live single trigger fits extreme value(s)' - ) - - -def run_and_error(command_arguments, controls): - """ - Wrapper around subprocess.run to catch errors and send emails if required - """ - logging.info("Running " + " ".join(command_arguments)) - command_output = subprocess.run(command_arguments, capture_output=True) - if command_output.returncode: - error_contents = [' '.join(command_arguments), - command_output.stderr.decode()] - mail_volunteers_error( - controls, - error_contents, - f"PyCBC live could not run {command_arguments[0]}" - ) - err_msg = f"Could not run {command_arguments[0]}" - raise subprocess.SubprocessError(err_msg) - - -# These are the option used to control the supervision, and will not be passed -# to the subprocesses -control_options = [ - "check-daily-output", - "combined-days", - "mail-volunteers-file", - "output-directory", - "output-id-str", - "public-dir", - "replay-duration", - "replay-start-time", - "submit-dir", - "trfits-format", - "true-start-time", - "variable-trigger-fits", -] - -# these are options which can be taken by both the daily fit code and the -# combined fitting code -options_both = ['ifos', 'verbose'] - -# These options are only for the daily fit code -daily_fit_options = [ - 'cluster', - 'date-directories', - 'duration-bin-edges', - 'duration-bin-spacing', - 'duration-from-bank', - 'file-identifier', - 'fit-function', - 'fit-threshold', - 'num-duration-bins', - 'prune-loudest', - 'prune-stat-threshold', - 'prune-window', - 'sngl-ranking', - 'template-cuts', - 'trigger-cuts', - 'trigger-directory', -] - -combined_fit_options = [ - 'conservative-percentile', -] - -coeff_test_options = [ - 'duration-bin-lower-limit', - 'lower-limit-coefficient', - 'upper-limit-coefficient', -] - -all_options = control_options + options_both + daily_fit_options \ - + combined_fit_options + coeff_test_options - - -def do_fitting(args, day_dt, day_str): - """ - Perform the fits as specified - """ - # Read in the config file and pack into appropriate dictionaries - daily_options = {} - combined_options = {} - test_options = {} - controls = {} - - with open(args.config_file, 'r') as conf_file: - all_lines = conf_file.readlines() - - for line in all_lines: - # Ignore whitespace and comments - line = line.strip() - if not line: - continue - if line.startswith(';'): - continue - - option, value = line.split('=') - option = option.strip() - value = value.strip() - - # If it is a control option, add to the controls dictionary - if option in control_options: - controls[option] = value - - # If the option is not to control the input, then it is passed - # straight to the executable - if option in daily_fit_options or option in options_both: - daily_options[option] = value - - if option in options_both or option in combined_fit_options: - combined_options[option] = value - - if option in coeff_test_options: - test_options[option] = value - - if option not in all_options: - logging.warning("Option %s unrecognised, ignoring", option) - - # The main output directory will have a date subdirectory which we - # put the output into - output_dir = os.path.join(controls['output-directory'], day_str) - subprocess.run(['mkdir', '-p', output_dir]) - if 'public-dir' in controls: - public_dir = os.path.join(controls['public-dir'], *day_str.split('_')) - subprocess.run(['mkdir', '-p', public_dir]) - - if not args.combine_only: - ##### DAILY FITTING ##### - file_id_str = f'{day_str}' - if 'output-id-str' in controls: - file_id_str += f"-{controls['output-id-str']}" - out_fname = f'{file_id_str}-TRIGGER-FITS.hdf' - daily_options['output'] = os.path.join(output_dir, out_fname) - daily_args = ['pycbc_live_single_significance_fits'] - - daily_options['gps-start-time'] = f'{gpstime.utc_to_gps(day_dt).gpsSeconds:d}' - daily_options['gps-end-time'] = f'{gpstime.utc_to_gps(day_dt + timedelta(days=1)).gpsSeconds:d}' - daily_args += dict_to_args(daily_options) - - run_and_error(daily_args, controls) - - # Add plotting for daily fits, and linking to the public directory - logging.info("Plotting daily fits") - daily_plot_output = os.path.join(output_dir, - '{ifo}-' + f'{out_fname[:-3]}png') - daily_plot_arguments = [ - 'pycbc_live_plot_single_significance_fits', - '--trigger-fits-file', - daily_options['output'], - '--output-plot-name-format', - daily_plot_output, - '--log-colormap' - ] - run_and_error(daily_plot_arguments, controls) - - # Link the plots to the public-dir if wanted - if 'public-dir' in controls: - daily_plot_outputs = [daily_plot_output.format(ifo=ifo) for ifo in - daily_options['ifos'].split()] - logging.info("Linking daily fits plots") - for dpo in daily_plot_outputs: - symlink(dpo, public_dir) - - if args.daily_only: - if 'check-daily-output' in controls: - logging.info( - "Checking that fit coefficients above %s for bins above %ss", - test_options['lower-limit-coefficient'], - test_options['duration-bin-lower-limit'] - ) - check_trigger_files( - [daily_options['output']], - test_options, - controls - ) - logging.info('Done') - exit() - - ##### COMBINED FITTING ##### - combined_days = int(controls['combined-days']) - if 'replay-start-time' in controls: - replay_start_time = int(controls['replay-start-time']) - true_start_time = int(controls['true-start-time']) - replay_duration = int(controls['replay-duration']) - dt_replay_start = gpstime.gps_to_utc(replay_start_time) - - td = (day_dt - dt_replay_start).total_seconds() - - # Time since the start of this replay - time_since_replay = np.remainder(td, replay_duration) - - # Add this on to the original start time to get the current time of - # the replay data - dt_true_start = gpstime.gps_to_utc(true_start_time) - - # Original time of the data being replayed right now - current_date = dt_true_start + timedelta(seconds=time_since_replay) - else: - current_date = day_dt - - date_test = current_date + timedelta(days=1) - - logging.info("Finding trigger fit files for combination") - if 'check-daily-output' in controls: - logging.info( - "Checking all files that fit coefficients above %s for bins " - "above %ss", - test_options['lower-limit-coefficient'], - test_options['duration-bin-lower-limit'] - ) - - trfits_files = [] - missed_files = 0 - found_files = 0 - while found_files < combined_days and missed_files < 10: - # Loop through the possible file locations and see if the file exists - date_test -= timedelta(days=1) - date_out = date_test.strftime("%Y_%m_%d") - trfits_filename = controls['trfits-format'].format(date=date_out) - # Check that the file exists: - if not os.path.exists(trfits_filename): - missed_files += 1 - logging.info(f"File {trfits_filename} does not exist - skipping") - continue - if not len(trfits_files): - end_date = date_out - # This is now the oldest file - first_date = date_out - # reset the "missed files" counter, and add to the "found files" - missed_files = 0 - found_files += 1 - trfits_files.append(trfits_filename) - - if 'check-daily-output' in controls: - check_trigger_files(trfits_files, test_options, controls) - - if missed_files == 10: - # If more than 10 days between files, something wrong with analysis. - # warn and use fewer files - 10 here is chosen to be an unusual amount - # of time for the analysis to be down in standard operation - logging.warning('More than 10 days between files, only using ' - f'{found_files} files for combination!') - - file_id_str = f'{first_date}-{end_date}' - if 'output-id-str' in controls: - file_id_str += f"-{controls['output-id-str']}" - out_fname = f'{file_id_str}-TRIGGER_FITS_COMBINED' - combined_options['output'] = os.path.join(output_dir, out_fname + '.hdf') - - if not trfits_files: - raise ValueError("No files meet the criteria") - - combined_options['trfits-files'] = ' '.join(trfits_files) - - combined_args = ['pycbc_live_combine_single_significance_fits'] - combined_args += dict_to_args(combined_options) - - run_and_error(combined_args, controls) - - if 'variable-trigger-fits' in controls: - logging.info('Copying combined fits file to local filesystem') - try: - shutil.copyfile( - combined_options['output'], - controls['variable-trigger-fits'] - ) - except Exception as e: - mail_volunteers_error( - controls, - [str(e)], - "PyCBC live could not copy to variable trigger fits file" - ) - raise e - logging.info( - "%s updated to link to %s", - controls['variable-trigger-fits'], - combined_options['output'] - ) - - logging.info("Plotting combined fits") - # Add plotting for combined fits, and linking to the public directory - combined_plot_output = os.path.join(output_dir, - f"{{ifo}}-{out_fname}-{{type}}.png") - combined_plot_arguments = [ - 'pycbc_live_plot_combined_single_significance_fits', - '--combined-fits-file', - combined_options['output'], - '--output-plot-name-format', - combined_plot_output, - '--log-colormap' - ] - - run_and_error(combined_plot_arguments, controls) - - combined_plot_outputs = [ - combined_plot_output.format(ifo=ifo, type='fit_coeffs') for ifo in - combined_options['ifos'].split() - ] - combined_plot_outputs += [ - combined_plot_output.format(ifo=ifo, type='counts') for ifo in - combined_options['ifos'].split() - ] - - # Link the plots to the public-dir if wanted - if 'public-dir' in controls: - logging.info("Linking combined fits") - for cpo in combined_plot_outputs: - symlink(cpo, public_dir) - - logging.info('Done') - - -def wait_for_utc_time(target_str): - """Wait until the UTC time is as given by `target_str`, in HH:MM:SS format. - """ - target_hour, target_minute, target_second = map(int, target_str.split(':')) - now = datetime.utcnow() - # for today's target, take now and replace the time - target_today = now + relativedelta( - hour=target_hour, minute=target_minute, second=target_second - ) - # for tomorrow's target, take now, add one day, and replace the time - target_tomorrow = now + relativedelta( - days=1, hour=target_hour, minute=target_minute, second=target_second - ) - next_target = target_today if now <= target_today else target_tomorrow - sleep_seconds = (next_target - now).total_seconds() - logging.info('Waiting %.0f s', sleep_seconds) - time.sleep(sleep_seconds) - - -parser = argparse.ArgumentParser(description=__doc__) -pycbc.add_common_pycbc_options(parser) -parser.add_argument( - '--config-file', - required=True -) -parser.add_argument( - '--date', - help='Date to analyse, if not given, will analyse yesterday (UTC). ' - 'Format YYYY_MM_DD. Do not use if using --run-daily-at.' -) -parser.add_argument( - '--combine-only', - action='store_true', - help="Only do the combination of singles fit files." -) -parser.add_argument( - '--daily-only', - action='store_true', - help="Only do the daily singles fitting." -) -parser.add_argument( - '--run-daily-at', - metavar='HH:MM:SS', - help='Stay running and repeat the fitting daily at the given UTC hour.' -) -args = parser.parse_args() - -pycbc.init_logging(args.verbose, default_level=1) - -if args.run_daily_at is not None and args.date is not None: - parser.error('Cannot take --run-daily-at and --date at the same time') - -if args.run_daily_at is not None: - # keep running and repeat the fitting every day at the given hour - if not re.match('[0-9][0-9]:[0-9][0-9]:[0-9][0-9]', args.run_daily_at): - parser.error('--run-daily-at takes a UTC time in the format HH:MM:SS') - logging.info('Starting in daily run mode') - while True: - wait_for_utc_time(args.run_daily_at) - logging.info('==== Time to update the single fits, waking up ====') - # Get the date string for yesterday's triggers - day_dt = datetime.utcnow() - timedelta(days=1) - day_str = day_dt.strftime('%Y_%m_%d') - do_fitting(args, day_dt, day_str) -else: - # run just once - if args.date: - day_str = args.date - day_dt = datetime.strptime(args.date, '%Y_%m_%d') - else: - # Get the date string for yesterday's triggers - day_dt = datetime.utcnow() - timedelta(days=1) - day_str = day_dt.strftime('%Y_%m_%d') - do_fitting(args, day_dt, day_str) diff --git a/bin/plotting/pycbc_plot_bank_corner b/bin/plotting/pycbc_plot_bank_corner index 4494ab56eb8..01fdf9100cf 100644 --- a/bin/plotting/pycbc_plot_bank_corner +++ b/bin/plotting/pycbc_plot_bank_corner @@ -72,10 +72,6 @@ parser.add_argument("--parameters", "property of that parameter will be used. If not " "provided, will plot all of the parameters in the " "bank.") -parser.add_argument("--log-parameters", - nargs="+", - default=[], - help="Plot these parameters on a log scale") parser.add_argument('--plot-histogram', action='store_true', help="Plot 1D histograms of parameters on the " diff --git a/pycbc/live/__init__.py b/pycbc/live/__init__.py index 4037f6634ae..5a3a40c901a 100644 --- a/pycbc/live/__init__.py +++ b/pycbc/live/__init__.py @@ -4,3 +4,4 @@ from .snr_optimizer import * from .significance_fits import * +from .supervision import * diff --git a/pycbc/live/supervision.py b/pycbc/live/supervision.py new file mode 100644 index 00000000000..858dc6c782b --- /dev/null +++ b/pycbc/live/supervision.py @@ -0,0 +1,154 @@ +# Copyright (C) 2023 Arthur Tolley, Gareth Cabourn Davies +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 3 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +""" +This module contains functions for supervising codes to run regularly +during pycbc_live production, taking input from the search and returning +files which can be used in the search. +This module is primarily used in the pycbc_live_supervise_* programs. +""" + +import logging +import subprocess +import time +import os +from datetime import datetime +from dateutil.relativedelta import relativedelta + +import pycbc + +logger = logging.getLogger('pycbc.live.supervision') + + +def symlink(target, link_name): + """ + Create a symbolic link replacing the destination and checking for + errors. + """ + # Ensure that the target and link name are absolute paths + target = os.path.abspath(target) + link_name = os.path.abspath(link_name) + logger.info("Linking %s to %s", target, link_name) + try: + subprocess.run(['ln', '-sf', target, link_name], check=True) + except subprocess.CalledProcessError as sub_err: + logging.error("Could not link %s to %s", target, link_name) + raise sub_err + + +def dict_to_args(opts_dict): + """ + Convert an option dictionary into a list to be used by subprocess.run + """ + dargs = [] + for option, value in opts_dict.items(): + dargs.append('--' + option.strip()) + if value == '': + # option is a flag, do nothing + continue + if len(value.split()) > 1: + # value is a list, append individually + for v in value.split(): + dargs.append(v) + else: + # Single value option - append once + dargs.append(value) + return dargs + + +def mail_volunteers_error(controls, mail_body_lines, subject): + """ + Email a list of people, defined by mail-volunteers-file + To be used for errors or unusual occurences + """ + with open(controls['mail-volunteers-file'], 'r') as mail_volunteers_file: + volunteers = [volunteer.strip() for volunteer in + mail_volunteers_file.readlines()] + logger.info("Emailing %s with warnings", ' '.join(volunteers)) + mail_command = [ + 'mail', + '-s', + subject + ] + mail_command += volunteers + mail_body = '\n'.join(mail_body_lines) + try: + subprocess.run(mail_command, input=mail_body, text=True, check=True) + except subprocess.CalledProcessError as sub_err: + logging.error("Could not send mail on error") + raise sub_err + + +def run_and_error(command_arguments, controls): + """ + Wrapper around subprocess.run to catch errors and send emails if required + """ + logger.info("Running %s", " ".join(command_arguments)) + command_output = subprocess.run( + command_arguments, + capture_output=True + ) + + if command_output.returncode: + error_contents = [' '.join(command_arguments), '\n', + command_output.stderr.decode()] + if 'mail-volunteers-file' in controls: + mail_volunteers_error( + controls, + error_contents, + f"PyCBC live could not run {command_arguments[0]}" + ) + err_msg = f"Could not run {command_arguments[0]}:\n" + err_msg += ' '.join(error_contents) + raise subprocess.SubprocessError(err_msg) + + +def wait_for_utc_time(target_str): + """Wait until the UTC time is as given by `target_str`, in HH:MM:SS format. + """ + target_hour, target_minute, target_second = map(int, target_str.split(':')) + now = datetime.utcnow() + # for today's target, take now and replace the time + target_today = now + relativedelta( + hour=target_hour, minute=target_minute, second=target_second + ) + # for tomorrow's target, take now, add one day, and replace the time + target_tomorrow = now + relativedelta( + days=1, hour=target_hour, minute=target_minute, second=target_second + ) + next_target = target_today if now <= target_today else target_tomorrow + sleep_seconds = (next_target - now).total_seconds() + logger.info('Waiting %.0f s', sleep_seconds) + time.sleep(sleep_seconds) + + +def ensure_directories(control_values, day_str): + """ + Ensure that the required directories exist + """ + output_dir = os.path.join( + control_values['output-directory'], + day_str + ) + pycbc.makedir(output_dir) + if 'public-dir' in control_values: + # The public directory wil be in subdirectories for the year, month, + # day, e.g. 2024_04_12 will be in 2024/04/12. + public_dir = os.path.join( + control_values['public-dir'], + *day_str.split('_') + ) + pycbc.makedir(public_dir)