From cf6447ebd37848b0f7633397b4ea65cd30580c7a Mon Sep 17 00:00:00 2001 From: maxtrevor <65971534+maxtrevor@users.noreply.github.com> Date: Thu, 22 Aug 2024 10:59:00 -0400 Subject: [PATCH] Idq live (#4850) * Have pycbc live mark flagged triggers instead of removing * Make stat usable in low-latency * Add command line argument for whether to use idq for reweighting * Add logging for iDQ flagged triggers * Fix bug when using ifo with no dq * Improve logic for getting ifo frm trigs * Update for compatibility with Gareth's stat reloading code * Modify how trig ifo is gotten and add debug statements * Use logging not print for debugging * logger not logging * Fix where tnum is set * Get rid of excess logging * Address Gareth's comments * Codeclimate * Apply suggestions from code review Co-authored-by: Gareth S Cabourn Davies --------- Co-authored-by: Gareth S Cabourn Davies --- bin/pycbc_live | 36 ++++++++++---- pycbc/events/stat.py | 112 +++++++++++++++++++++++++++++-------------- pycbc/frame/frame.py | 35 +++++++++----- 3 files changed, 127 insertions(+), 56 deletions(-) diff --git a/bin/pycbc_live b/bin/pycbc_live index be649e6ae0e..52a0de2433a 100755 --- a/bin/pycbc_live +++ b/bin/pycbc_live @@ -848,6 +848,8 @@ parser.add_argument('--idq-state-channel', action=MultiDetMultiColonOptionAction parser.add_argument('--idq-threshold', type=float, help='Threshold used to veto triggers at times of ' 'low iDQ False Alarm Probability') +parser.add_argument('--idq-reweighting', action='store_true',default=False, + help='Reweight triggers based on iDQ False Alarm Probability') parser.add_argument('--data-quality-channel', action=MultiDetMultiColonOptionAction, help="Channel containing data quality information. Used " @@ -1311,17 +1313,35 @@ with ctx: if len(results[ifo][key]): results[ifo][key] = results[ifo][key][idx] if data_reader[ifo].idq is not None: - logging.info("Checking %s's iDQ information", ifo) + logging.info("Reading %s's iDQ information", ifo) start = data_reader[ifo].start_time times = results[ifo]['end_time'] - idx = data_reader[ifo].idq.indices_of_flag( + flag_active = data_reader[ifo].idq.flag_at_times( start, valid_pad, times, - padding=data_reader[ifo].dq_padding) - logging.info('Keeping %d/%d %s triggers after iDQ', - len(idx), len(times), ifo) - for key in results[ifo]: - if len(results[ifo][key]): - results[ifo][key] = results[ifo][key][idx] + padding=data_reader[ifo].dq_padding + ) + + if args.idq_reweighting: + logging.info( + 'iDQ flagged %d/%d %s triggers', + numpy.sum(flag_active), + len(times), + ifo + ) + results[ifo]['dq_state'] = flag_active.astype(int) + else: + # use idq as a veto + keep = numpy.logical_not(flag_active) + logging.info( + 'Keeping %d/%d %s triggers after iDQ', + numpy.sum(keep), + len(times), + ifo + ) + for key in results[ifo]: + if len(results[ifo][key]): + results[ifo][key] = \ + results[ifo][key][keep] # Calculate and add the psd variation for the results if args.psd_variation: diff --git a/pycbc/events/stat.py b/pycbc/events/stat.py index f61e7c55b66..ff8da9a2f19 100644 --- a/pycbc/events/stat.py +++ b/pycbc/events/stat.py @@ -1670,6 +1670,12 @@ def single(self, trigs): numpy.ndarray The array of single detector values """ + try: + # exists if accessed via coinc_findtrigs + self.curr_tnum = trigs.template_num + except AttributeError: + # exists for SingleDetTriggers & pycbc_live get_coinc + self.curr_tnum = trigs['template_id'] # single-ifo stat = log of noise rate sngl_stat = self.lognoiserate(trigs) @@ -1681,12 +1687,6 @@ def single(self, trigs): singles['end_time'] = trigs['end_time'][:] singles['sigmasq'] = trigs['sigmasq'][:] singles['snr'] = trigs['snr'][:] - try: - # exists if accessed via coinc_findtrigs - self.curr_tnum = trigs.template_num - except AttributeError: - # exists for SingleDetTriggers & pycbc_live get_coinc - self.curr_tnum = trigs['template_id'] # Store benchmark log volume as single-ifo information since the coinc # method does not have access to template id @@ -2271,14 +2271,46 @@ def __init__(self, sngl_ranking, files=None, ifos=None, ifos=ifos, **kwargs) self.dq_rates_by_state = {} self.dq_bin_by_tid = {} - self.dq_state_segments = {} + self.dq_state_segments = None + self.low_latency = False + self.single_dtype.append(('dq_state', int)) for ifo in self.ifos: key = f'{ifo}-dq_stat_info' if key in self.files.keys(): self.dq_rates_by_state[ifo] = self.assign_dq_rates(key) self.dq_bin_by_tid[ifo] = self.assign_template_bins(key) - self.dq_state_segments[ifo] = self.setup_segments(key) + self.check_low_latency(key) + if not self.low_latency: + if self.dq_state_segments is None: + self.dq_state_segments = {} + self.dq_state_segments[ifo] = self.setup_segments(key) + + def check_low_latency(self, key): + """ + Check if the statistic file indicates low latency mode. + Parameters + ---------- + key: str + Statistic file key string. + Returns + ------- + None + """ + ifo = key.split('-')[0] + with h5py.File(self.files[key], 'r') as dq_file: + ifo_grp = dq_file[ifo] + if 'dq_segments' not in ifo_grp.keys(): + # if segs are not in file, we must be in LL + if self.dq_state_segments is not None: + raise ValueError( + 'Either all dq stat files must have segments or none' + ) + self.low_latency = True + elif self.low_latency: + raise ValueError( + 'Either all dq stat files must have segments or none' + ) def assign_template_bins(self, key): """ @@ -2337,9 +2369,7 @@ def assign_dq_rates(self, key): def setup_segments(self, key): """ - Check if segments definitions are in stat file - If they are, we are running offline and need to store them - If they aren't, we are running online + Store segments from stat file """ ifo = key.split('-')[0] with h5py.File(self.files[key], 'r') as dq_file: @@ -2368,35 +2398,32 @@ def update_file(self, key): return True # We also need to check if the DQ files have updated if key.endswith('dq_stat_info'): + ifo = key.split('-')[0] logger.info( "Updating %s statistic %s file", - ''.join(self.ifos), + ifo, key ) - self.assign_dq_rates(key) - self.assign_template_bins(key) - self.setup_segments(key) + self.dq_rates_by_state[ifo] = self.assign_dq_rates(key) + self.dq_bin_by_tid[ifo] = self.assign_template_bins(key) return True return False - def find_dq_noise_rate(self, trigs, dq_state): + def find_dq_noise_rate(self, trigs): """Get dq values for a specific ifo and dq states""" - try: - tnum = trigs.template_num - except AttributeError: - tnum = trigs['template_id'] - try: ifo = trigs.ifo except AttributeError: - ifo = trigs['ifo'] - assert len(numpy.unique(ifo)) == 1 - # Should be exactly one ifo provided - ifo = ifo[0] + ifo = trigs.get('ifo', None) + if ifo is None: + ifo = self.ifos[0] + assert ifo in self.ifos - dq_val = numpy.zeros(len(dq_state)) + dq_state = trigs['dq_state'] + dq_val = numpy.ones(len(dq_state)) + tnum = self.curr_tnum if ifo in self.dq_rates_by_state: for (i, st) in enumerate(dq_state): if isinstance(tnum, numpy.ndarray): @@ -2437,17 +2464,7 @@ def lognoiserate(self, trigs): Array of log noise rate density for each input trigger. """ - # make sure every trig has a dq state - try: - ifo = trigs.ifo - except AttributeError: - ifo = trigs['ifo'] - assert len(numpy.unique(ifo)) == 1 - # Should be exactly one ifo provided - ifo = ifo[0] - - dq_state = self.find_dq_state_by_time(ifo, trigs['end_time'][:]) - dq_rate = self.find_dq_noise_rate(trigs, dq_state) + dq_rate = self.find_dq_noise_rate(trigs) dq_rate = numpy.maximum(dq_rate, 1) logr_n = ExpFitFgBgNormStatistic.lognoiserate( @@ -2455,6 +2472,27 @@ def lognoiserate(self, trigs): logr_n += numpy.log(dq_rate) return logr_n + def single(self, trigs): + # make sure every trig has a dq state + try: + ifo = trigs.ifo + except AttributeError: + ifo = trigs.get('ifo', None) + if ifo is None: + ifo = self.ifos[0] + assert ifo in self.ifos + + singles = ExpFitFgBgNormStatistic.single(self, trigs) + + if self.low_latency: + # trigs should already have a dq state assigned + singles['dq_state'] = trigs['dq_state'][:] + else: + singles['dq_state'] = self.find_dq_state_by_time( + ifo, trigs['end_time'][:] + ) + return singles + class DQExpFitFgBgKDEStatistic(DQExpFitFgBgNormStatistic): """ diff --git a/pycbc/frame/frame.py b/pycbc/frame/frame.py index a67a3d090d9..bea7386418a 100644 --- a/pycbc/frame/frame.py +++ b/pycbc/frame/frame.py @@ -896,8 +896,8 @@ def __init__(self, frame_src, force_update_cache=force_update_cache, increment_update_cache=increment_update_cache) - def indices_of_flag(self, start_time, duration, times, padding=0): - """ Return the indices of the times lying in the flagged region + def flag_at_times(self, start_time, duration, times, padding=0): + """ Check whether the idq flag was on at given times Parameters ---------- @@ -905,32 +905,45 @@ def indices_of_flag(self, start_time, duration, times, padding=0): Beginning time to request for duration: int Number of seconds to check. + times: array of floats + Times to check for an active flag padding: float - Number of seconds to add around flag inactive times to be considered - inactive as well. + Amount of time in seconds to flag around samples + below the iDQ FAP threshold Returns ------- - indices: numpy.ndarray - Array of indices marking the location of triggers within valid - time. + flag_state: numpy.ndarray + Boolean array of whether flag was on at given times """ - from pycbc.events.veto import indices_outside_times + from pycbc.events.veto import indices_within_times + + # convert start and end times to buffer indices sr = self.idq.raw_buffer.sample_rate s = int((start_time - self.idq.raw_buffer.start_time - padding) * sr) - 1 e = s + int((duration + padding) * sr) + 1 + + # find samples when iDQ FAP is below threshold and state is valid idq_fap = self.idq.raw_buffer[s:e] - stamps = idq_fap.sample_times.numpy() low_fap = idq_fap.numpy() <= self.threshold idq_valid = self.idq_state.raw_buffer[s:e] idq_valid = idq_valid.numpy().astype(bool) valid_low_fap = numpy.logical_and(idq_valid, low_fap) + + # find times corresponding to the valid low FAP samples glitch_idx = numpy.flatnonzero(valid_low_fap) + stamps = idq_fap.sample_times.numpy() glitch_times = stamps[glitch_idx] + + # construct start and end times of flag segments starts = glitch_times - padding ends = starts + 1.0 / sr + padding * 2.0 - idx = indices_outside_times(times, starts, ends) - return idx + + # check if times were flagged + idx = indices_within_times(times, starts, ends) + flagged_bool = numpy.zeros(len(times), dtype=bool) + flagged_bool[idx] = True + return flagged_bool def advance(self, blocksize): """ Add blocksize seconds more to the buffer, push blocksize seconds