Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Idq live #4850

Merged
merged 15 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions bin/pycbc_live
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,8 @@ parser.add_argument('--idq-state-channel', action=MultiDetMultiColonOptionAction
parser.add_argument('--idq-threshold', type=float,
help='Threshold used to veto triggers at times of '
'low iDQ False Alarm Probability')
parser.add_argument('--idq-reweighting', action='store_true',default=False,
help='Reweight triggers based on iDQ False Alarm Probability')
parser.add_argument('--data-quality-channel',
action=MultiDetMultiColonOptionAction,
help="Channel containing data quality information. Used "
Expand Down Expand Up @@ -1311,17 +1313,35 @@ with ctx:
if len(results[ifo][key]):
results[ifo][key] = results[ifo][key][idx]
if data_reader[ifo].idq is not None:
logging.info("Checking %s's iDQ information", ifo)
logging.info("Reading %s's iDQ information", ifo)
start = data_reader[ifo].start_time
times = results[ifo]['end_time']
idx = data_reader[ifo].idq.indices_of_flag(
flag_active = data_reader[ifo].idq.flag_at_times(
start, valid_pad, times,
padding=data_reader[ifo].dq_padding)
logging.info('Keeping %d/%d %s triggers after iDQ',
len(idx), len(times), ifo)
for key in results[ifo]:
if len(results[ifo][key]):
results[ifo][key] = results[ifo][key][idx]
padding=data_reader[ifo].dq_padding
)

if args.idq_reweighting:
logging.info(
'iDQ flagged %d/%d %s triggers',
numpy.sum(flag_active),
len(times),
ifo
)
results[ifo]['dq_state'] = flag_active.astype(int)
else:
# use idq as a veto
keep = numpy.logical_not(flag_active)
logging.info(
'Keeping %d/%d %s triggers after iDQ',
numpy.sum(keep),
len(times),
ifo
)
for key in results[ifo]:
if len(results[ifo][key]):
results[ifo][key] = \
results[ifo][key][keep]

# Calculate and add the psd variation for the results
if args.psd_variation:
Expand Down
112 changes: 75 additions & 37 deletions pycbc/events/stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -1670,6 +1670,12 @@ def single(self, trigs):
numpy.ndarray
The array of single detector values
"""
try:
# exists if accessed via coinc_findtrigs
self.curr_tnum = trigs.template_num
except AttributeError:
# exists for SingleDetTriggers & pycbc_live get_coinc
self.curr_tnum = trigs['template_id']

# single-ifo stat = log of noise rate
sngl_stat = self.lognoiserate(trigs)
Expand All @@ -1681,12 +1687,6 @@ def single(self, trigs):
singles['end_time'] = trigs['end_time'][:]
singles['sigmasq'] = trigs['sigmasq'][:]
singles['snr'] = trigs['snr'][:]
try:
# exists if accessed via coinc_findtrigs
self.curr_tnum = trigs.template_num
except AttributeError:
# exists for SingleDetTriggers & pycbc_live get_coinc
self.curr_tnum = trigs['template_id']

# Store benchmark log volume as single-ifo information since the coinc
# method does not have access to template id
Expand Down Expand Up @@ -2271,14 +2271,46 @@ def __init__(self, sngl_ranking, files=None, ifos=None,
ifos=ifos, **kwargs)
self.dq_rates_by_state = {}
self.dq_bin_by_tid = {}
self.dq_state_segments = {}
self.dq_state_segments = None
self.low_latency = False
self.single_dtype.append(('dq_state', int))

for ifo in self.ifos:
key = f'{ifo}-dq_stat_info'
if key in self.files.keys():
self.dq_rates_by_state[ifo] = self.assign_dq_rates(key)
self.dq_bin_by_tid[ifo] = self.assign_template_bins(key)
self.dq_state_segments[ifo] = self.setup_segments(key)
self.check_low_latency(key)
if not self.low_latency:
if self.dq_state_segments is None:
self.dq_state_segments = {}
self.dq_state_segments[ifo] = self.setup_segments(key)

def check_low_latency(self, key):
"""
Check if the statistic file indicates low latency mode.
Parameters
----------
key: str
Statistic file key string.
Returns
-------
None
"""
ifo = key.split('-')[0]
with h5py.File(self.files[key], 'r') as dq_file:
ifo_grp = dq_file[ifo]
if 'dq_segments' not in ifo_grp.keys():
# if segs are not in file, we must be in LL
if self.dq_state_segments is not None:
raise ValueError(
'Either all dq stat files must have segments or none'
)
self.low_latency = True
elif self.low_latency:
raise ValueError(
'Either all dq stat files must have segments or none'
)

def assign_template_bins(self, key):
"""
Expand Down Expand Up @@ -2337,9 +2369,7 @@ def assign_dq_rates(self, key):

def setup_segments(self, key):
"""
Check if segments definitions are in stat file
If they are, we are running offline and need to store them
If they aren't, we are running online
Store segments from stat file
"""
ifo = key.split('-')[0]
with h5py.File(self.files[key], 'r') as dq_file:
Expand Down Expand Up @@ -2368,35 +2398,32 @@ def update_file(self, key):
return True
# We also need to check if the DQ files have updated
if key.endswith('dq_stat_info'):
ifo = key.split('-')[0]
logger.info(
"Updating %s statistic %s file",
''.join(self.ifos),
ifo,
key
)
self.assign_dq_rates(key)
self.assign_template_bins(key)
self.setup_segments(key)
self.dq_rates_by_state[ifo] = self.assign_dq_rates(key)
self.dq_bin_by_tid[ifo] = self.assign_template_bins(key)
return True
maxtrevor marked this conversation as resolved.
Show resolved Hide resolved
return False

def find_dq_noise_rate(self, trigs, dq_state):
def find_dq_noise_rate(self, trigs):
"""Get dq values for a specific ifo and dq states"""

try:
tnum = trigs.template_num
except AttributeError:
tnum = trigs['template_id']

try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This try/except is used a couple of times now. It would be good to be in a function. Codeclimate will probably complain

ifo = trigs.ifo
except AttributeError:
ifo = trigs['ifo']
assert len(numpy.unique(ifo)) == 1
# Should be exactly one ifo provided
ifo = ifo[0]
ifo = trigs.get('ifo', None)
if ifo is None:
ifo = self.ifos[0]
assert ifo in self.ifos

dq_val = numpy.zeros(len(dq_state))
dq_state = trigs['dq_state']
dq_val = numpy.ones(len(dq_state))

tnum = self.curr_tnum
GarethCabournDavies marked this conversation as resolved.
Show resolved Hide resolved
if ifo in self.dq_rates_by_state:
for (i, st) in enumerate(dq_state):
if isinstance(tnum, numpy.ndarray):
Expand Down Expand Up @@ -2437,24 +2464,35 @@ def lognoiserate(self, trigs):
Array of log noise rate density for each input trigger.
"""

# make sure every trig has a dq state
try:
ifo = trigs.ifo
except AttributeError:
ifo = trigs['ifo']
assert len(numpy.unique(ifo)) == 1
# Should be exactly one ifo provided
ifo = ifo[0]

dq_state = self.find_dq_state_by_time(ifo, trigs['end_time'][:])
dq_rate = self.find_dq_noise_rate(trigs, dq_state)
dq_rate = self.find_dq_noise_rate(trigs)
dq_rate = numpy.maximum(dq_rate, 1)

logr_n = ExpFitFgBgNormStatistic.lognoiserate(
self, trigs)
logr_n += numpy.log(dq_rate)
return logr_n

def single(self, trigs):
# make sure every trig has a dq state
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an if hasattr(self, 'ifo') would be nicer here than the try/except. However I know that is against EAFP, and this is how it is done elsewhere

ifo = trigs.ifo
except AttributeError:
ifo = trigs.get('ifo', None)
if ifo is None:
ifo = self.ifos[0]
assert ifo in self.ifos

singles = ExpFitFgBgNormStatistic.single(self, trigs)

if self.low_latency:
# trigs should already have a dq state assigned
singles['dq_state'] = trigs['dq_state'][:]
else:
singles['dq_state'] = self.find_dq_state_by_time(
ifo, trigs['end_time'][:]
)
return singles


class DQExpFitFgBgKDEStatistic(DQExpFitFgBgNormStatistic):
"""
Expand Down
35 changes: 24 additions & 11 deletions pycbc/frame/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,41 +896,54 @@ def __init__(self, frame_src,
force_update_cache=force_update_cache,
increment_update_cache=increment_update_cache)

def indices_of_flag(self, start_time, duration, times, padding=0):
""" Return the indices of the times lying in the flagged region
def flag_at_times(self, start_time, duration, times, padding=0):
""" Check whether the idq flag was on at given times

Parameters
----------
start_time: int
Beginning time to request for
duration: int
Number of seconds to check.
times: array of floats
Times to check for an active flag
padding: float
Number of seconds to add around flag inactive times to be considered
inactive as well.
Amount of time in seconds to flag around samples
below the iDQ FAP threshold

Returns
-------
indices: numpy.ndarray
Array of indices marking the location of triggers within valid
time.
flag_state: numpy.ndarray
Boolean array of whether flag was on at given times
"""
from pycbc.events.veto import indices_outside_times
from pycbc.events.veto import indices_within_times

# convert start and end times to buffer indices
sr = self.idq.raw_buffer.sample_rate
s = int((start_time - self.idq.raw_buffer.start_time - padding) * sr) - 1
maxtrevor marked this conversation as resolved.
Show resolved Hide resolved
e = s + int((duration + padding) * sr) + 1

# find samples when iDQ FAP is below threshold and state is valid
idq_fap = self.idq.raw_buffer[s:e]
stamps = idq_fap.sample_times.numpy()
low_fap = idq_fap.numpy() <= self.threshold
idq_valid = self.idq_state.raw_buffer[s:e]
idq_valid = idq_valid.numpy().astype(bool)
valid_low_fap = numpy.logical_and(idq_valid, low_fap)

# find times corresponding to the valid low FAP samples
glitch_idx = numpy.flatnonzero(valid_low_fap)
stamps = idq_fap.sample_times.numpy()
glitch_times = stamps[glitch_idx]

# construct start and end times of flag segments
starts = glitch_times - padding
ends = starts + 1.0 / sr + padding * 2.0
idx = indices_outside_times(times, starts, ends)
return idx

# check if times were flagged
idx = indices_within_times(times, starts, ends)
flagged_bool = numpy.zeros(len(times), dtype=bool)
flagged_bool[idx] = True
return flagged_bool

def advance(self, blocksize):
""" Add blocksize seconds more to the buffer, push blocksize seconds
Expand Down
Loading