Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve task polling and timeout handling #2593

Merged
merged 5 commits into from
May 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions doc/src/cylc-user-guide/suiterc.tex
Original file line number Diff line number Diff line change
Expand Up @@ -1371,6 +1371,11 @@ \subsection{[runtime]}
For \lstinline=background= and \lstinline=at=, the job script will be invoked using the \lstinline=timeout= command.
For other batch systems, the specified time will be automatically translated into the equivalent directive for wall clock limit.

Tasks are polled multiple times, where necessary, when they exceed their
execution time limits.
(See~\ref{ExecutionTimeLimitPollingIntervals} for how to configure the polling
intervals).

\begin{myitemize}
\item {\em type:} ISO 8601 duration/interval representation
\item {\em example:} \lstinline=PT5M=, 5 minutes, \lstinline=PT1H=, 1 hour
Expand Down
39 changes: 24 additions & 15 deletions lib/cylc/suite_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,24 +325,33 @@ def put_task_pool(self, pool):
"spawned": int(itask.has_spawned),
"status": itask.state.status,
"hold_swap": itask.state.hold_swap})
if itask.state.status in itask.timeout_timers:
if itask.timeout is not None:
self.db_inserts_map[self.TABLE_TASK_TIMEOUT_TIMERS].append({
"name": itask.tdef.name,
"cycle": str(itask.point),
"timeout": itask.timeout_timers[itask.state.status]})
for ctx_key_0 in ["poll_timers", "try_timers"]:
for ctx_key_1, timer in getattr(itask, ctx_key_0).items():
if timer is None:
continue
self.db_inserts_map[self.TABLE_TASK_ACTION_TIMERS].append({
"name": itask.tdef.name,
"cycle": str(itask.point),
"ctx_key": json.dumps((ctx_key_0, ctx_key_1)),
"ctx": self._namedtuple2json(timer.ctx),
"delays": json.dumps(timer.delays),
"num": timer.num,
"delay": timer.delay,
"timeout": timer.timeout})
"timeout": itask.timeout})
if itask.poll_timer is not None:
self.db_inserts_map[self.TABLE_TASK_ACTION_TIMERS].append({
"name": itask.tdef.name,
"cycle": str(itask.point),
"ctx_key": "poll_timer",
"ctx": self._namedtuple2json(itask.poll_timer.ctx),
"delays": json.dumps(itask.poll_timer.delays),
"num": itask.poll_timer.num,
"delay": itask.poll_timer.delay,
"timeout": itask.poll_timer.timeout})
for ctx_key_1, timer in itask.try_timers.items():
if timer is None:
continue
self.db_inserts_map[self.TABLE_TASK_ACTION_TIMERS].append({
"name": itask.tdef.name,
"cycle": str(itask.point),
"ctx_key": json.dumps(("try_timers", ctx_key_1)),
"ctx": self._namedtuple2json(timer.ctx),
"delays": json.dumps(timer.delays),
"num": timer.num,
"delay": timer.delay,
"timeout": timer.timeout})
if itask.state.time_updated:
set_args = {
"time_updated": itask.state.time_updated,
Expand Down
12 changes: 5 additions & 7 deletions lib/cylc/task_action_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ def __init__(self, ctx=None, delays=None, num=0, delay=None, timeout=None):
self.timeout = timeout
self.is_waiting = False

def delay_as_seconds(self):
"""Return the delay as PTnS, where n is number of seconds."""
return get_seconds_as_interval_string(self.delay)
def delay_timeout_as_str(self):
"""Return a string in the form "delay (after timeout)"."""
return r"%s (after %s)" % (
get_seconds_as_interval_string(self.delay),
get_time_string_from_unix_time(self.timeout))

def is_delay_done(self, now=None):
"""Is timeout done?"""
Expand Down Expand Up @@ -99,7 +101,3 @@ def set_waiting(self):
def unset_waiting(self):
"""Unset waiting flag after an action has completed."""
self.is_waiting = False

def timeout_as_str(self):
"""Return the timeout as an ISO8601 date-time string."""
return get_time_string_from_unix_time(self.timeout)
202 changes: 142 additions & 60 deletions lib/cylc/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@
from cylc.task_message import (
ABORT_MESSAGE_PREFIX, FAIL_MESSAGE_PREFIX, VACATION_MESSAGE_PREFIX)
from cylc.task_state import (
TASK_STATUSES_ACTIVE,
TASK_STATUS_READY, TASK_STATUS_SUBMITTED, TASK_STATUS_SUBMIT_RETRYING,
TASK_STATUS_SUBMIT_FAILED, TASK_STATUS_RUNNING, TASK_STATUS_RETRYING,
TASK_STATUS_FAILED, TASK_STATUS_SUCCEEDED)
from cylc.task_outputs import (
TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED, TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_FAILED, TASK_OUTPUT_SUBMIT_FAILED, TASK_OUTPUT_EXPIRED)
from cylc.wallclock import get_current_time_string
from cylc.wallclock import (
get_current_time_string, get_seconds_as_interval_string as intvl_as_str)


CustomTaskEventHandlerContext = namedtuple(
Expand Down Expand Up @@ -119,6 +121,7 @@ class TaskEventsManager(object):
HANDLER_JOB_LOGS_RETRIEVE = "job-logs-retrieve"
IGNORED_INCOMING_FLAG = "(>ignored)"
INCOMING_FLAG = ">"
KEY_EXECUTE_TIME_LIMIT = 'execution_time_limit'
LEVELS = {
"INFO": INFO,
"NORMAL": INFO,
Expand Down Expand Up @@ -149,6 +152,57 @@ def __init__(self, suite, proc_pool, suite_db_mgr, broadcast_mgr=None):
# when required.
self.pflag = False

@staticmethod
def check_poll_time(itask, now=None):
"""Set the next task execution/submission poll time.

If now is set, set the timer only if the previous delay is done.
Return the next delay.
"""
if itask.state.status not in TASK_STATUSES_ACTIVE:
# Reset, task not active
itask.timeout = None
itask.poll_timer = None
return None
ctx = (itask.submit_num, itask.state.status)
if itask.poll_timer is None or itask.poll_timer.ctx != ctx:
# Reset, timer no longer relevant
itask.timeout = None
itask.poll_timer = None
return None
if now is not None and not itask.poll_timer.is_delay_done(now):
return False
if itask.poll_timer.num is None:
itask.poll_timer.num = 0
itask.poll_timer.next(no_exhaust=True)
return True

def check_job_time(self, itask, now):
"""Check/handle job timeout and poll timer"""
can_poll = self.check_poll_time(itask, now)
if itask.timeout is None or now <= itask.timeout:
return can_poll
# Timeout reached for task, emit event and reset itask.timeout
if itask.state.status == TASK_STATUS_RUNNING:
time_ref = itask.summary['started_time']
event = 'execution timeout'
elif itask.state.status == TASK_STATUS_SUBMITTED:
time_ref = itask.summary['submitted_time']
event = 'submission timeout'
msg = event
try:
msg += ' after %s' % intvl_as_str(itask.timeout - time_ref)
except (TypeError, ValueError):
# Badness in time_ref?
pass
itask.timeout = None # emit event only once
if msg and event:
LOG.warning(msg, itask=itask)
self.setup_event_handlers(itask, event, msg)
return True
else:
return can_poll

def get_host_conf(self, itask, key, default=None, skey="remote"):
"""Return a host setting from suite then global configuration."""
overrides = self.broadcast_mgr.get_broadcast(itask.identity)
Expand Down Expand Up @@ -185,14 +239,13 @@ def process_events(self, schd_ctx):
# Report retries and delayed 1st try
tmpl = None
if timer.num > 1:
tmpl = "%s/%s/%02d %s failed, retrying in %s (after %s)"
tmpl = "%s/%s/%02d %s failed, retrying in %s"
elif timer.delay:
tmpl = "%s/%s/%02d %s will run after %s (after %s)"
tmpl = "%s/%s/%02d %s will run after %s"
if tmpl:
LOG.debug(tmpl % (
point, name, submit_num, key1,
timer.delay_as_seconds(),
timer.timeout_as_str()))
timer.delay_timeout_as_str()))
# Ready to run?
if not timer.is_delay_done() or (
# Avoid flooding user's mail box with mail notification.
Expand Down Expand Up @@ -339,15 +392,10 @@ def process_message(self, itask, severity, message, poll_func,
if TASK_STATUS_SUBMIT_RETRYING in itask.try_timers:
itask.try_timers[TASK_STATUS_SUBMIT_RETRYING].num = 0
itask.job_vacated = True
try:
itask.timeout_timers[TASK_STATUS_SUBMITTED] = (
itask.summary['submitted_time'] +
float(self._get_events_conf(itask, 'submission timeout')))
except (TypeError, ValueError):
itask.timeout_timers[TASK_STATUS_SUBMITTED] = None
# Believe this and change state without polling (could poll?).
self.pflag = True
itask.state.reset_state(TASK_STATUS_SUBMITTED)
self._reset_job_timers(itask)
elif an_output_was_satisfied:
# Message of an as-yet unreported custom task output.
# No state change.
Expand Down Expand Up @@ -381,29 +429,6 @@ def setup_event_handlers(self, itask, event, message):
self._setup_event_mail(itask, event)
self._setup_custom_event_handlers(itask, event, message)

@staticmethod
def set_poll_time(itask, now=None):
"""Set the next task execution/submission poll time.

If now is set, set the timer only if the previous delay is done.
Return the next delay.
"""
key = itask.state.status
timer = itask.poll_timers.get(key)
if timer is None:
return
if now is not None and not timer.is_delay_done(now):
return
if timer.num is None:
timer.num = 0
delay = timer.next(no_exhaust=True)
if delay is not None:
LOG.info(
'next job poll in %s (after %s)' % (
timer.delay_as_seconds(), timer.timeout_as_str()),
itask=itask)
return delay

def _custom_handler_callback(self, ctx, schd_ctx, id_key):
"""Callback when a custom event handler is done."""
_, point, name, submit_num = id_key
Expand Down Expand Up @@ -593,16 +618,15 @@ def _process_message_failed(self, itask, event_time, message):
itask.submit_num, "failed"), itask=itask)
else:
# There is a retry lined up
timeout_str = (
itask.try_timers[TASK_STATUS_RETRYING].timeout_as_str())
delay_msg = "retrying in %s" % (
itask.try_timers[TASK_STATUS_RETRYING].delay_as_seconds())
msg = "failed, %s (after %s)" % (delay_msg, timeout_str)
itask.try_timers[TASK_STATUS_RETRYING].delay_timeout_as_str())
msg = "failed, %s" % (delay_msg)
LOG.info("job(%02d) %s" % (itask.submit_num, msg), itask=itask)
itask.summary['latest_message'] = msg
self.setup_event_handlers(
itask, "retry", "%s, %s" % (self.JOB_FAILED, delay_msg))
itask.state.reset_state(TASK_STATUS_RETRYING)
self._reset_job_timers(itask)

def _process_message_started(self, itask, event_time):
"""Helper for process_message, handle a started message."""
Expand All @@ -612,24 +636,14 @@ def _process_message_started(self, itask, event_time):
self.pflag = True
itask.state.reset_state(TASK_STATUS_RUNNING)
itask.set_summary_time('started', event_time)
self._reset_job_timers(itask)
self.suite_db_mgr.put_update_task_jobs(itask, {
"time_run": itask.summary['started_time_string']})
if itask.summary['execution_time_limit']:
execution_timeout = itask.summary['execution_time_limit']
else:
execution_timeout = self._get_events_conf(
itask, 'execution timeout')
try:
itask.timeout_timers[TASK_STATUS_RUNNING] = (
itask.summary['started_time'] + float(execution_timeout))
except (TypeError, ValueError):
itask.timeout_timers[TASK_STATUS_RUNNING] = None

# submission was successful so reset submission try number
if TASK_STATUS_SUBMIT_RETRYING in itask.try_timers:
itask.try_timers[TASK_STATUS_SUBMIT_RETRYING].num = 0
self.setup_event_handlers(itask, 'started', 'job started')
self.set_poll_time(itask)

def _process_message_succeeded(self, itask, event_time):
"""Helper for process_message, handle a succeeded message."""
Expand All @@ -656,6 +670,7 @@ def _process_message_succeeded(self, itask, event_time):
itask=itask)
itask.state.reset_state(TASK_STATUS_SUCCEEDED)
self.setup_event_handlers(itask, "succeeded", "job succeeded")
self._reset_job_timers(itask)

def _process_message_submit_failed(self, itask, event_time):
"""Helper for process_message, handle a submit-failed message."""
Expand All @@ -679,16 +694,15 @@ def _process_message_submit_failed(self, itask, event_time):
else:
# There is a submission retry lined up.
timer = itask.try_timers[TASK_STATUS_SUBMIT_RETRYING]
timeout_str = timer.timeout_as_str()
delay_msg = "submit-retrying in %s" % timer.delay_as_seconds()
msg = "%s, %s (after %s)" % (
self.EVENT_SUBMIT_FAILED, delay_msg, timeout_str)
delay_msg = "submit-retrying in %s" % timer.delay_timeout_as_str()
msg = "%s, %s" % (self.EVENT_SUBMIT_FAILED, delay_msg)
LOG.info("job(%02d) %s" % (itask.submit_num, msg), itask=itask)
itask.summary['latest_message'] = msg
self.setup_event_handlers(
itask, self.EVENT_SUBMIT_RETRY,
"job %s, %s" % (self.EVENT_SUBMIT_FAILED, delay_msg))
itask.state.reset_state(TASK_STATUS_SUBMIT_RETRYING)
self._reset_job_timers(itask)

def _process_message_submitted(self, itask, event_time):
"""Helper for process_message, handle a submit-succeeded message."""
Expand Down Expand Up @@ -726,13 +740,7 @@ def _process_message_submitted(self, itask, event_time):
# The job started message can (rarely) come in before the submit
# command returns - in which case do not go back to 'submitted'.
itask.state.reset_state(TASK_STATUS_SUBMITTED)
try:
itask.timeout_timers[TASK_STATUS_SUBMITTED] = (
itask.summary['submitted_time'] +
float(self._get_events_conf(itask, 'submission timeout')))
except (TypeError, ValueError):
itask.timeout_timers[TASK_STATUS_SUBMITTED] = None
self.set_poll_time(itask)
self._reset_job_timers(itask)

def _setup_job_logs_retrieval(self, itask, event):
"""Set up remote job logs retrieval.
Expand Down Expand Up @@ -877,3 +885,77 @@ def _setup_custom_event_handlers(self, itask, event, message):
cmd,
),
retry_delays))

def _reset_job_timers(self, itask):
"""Set up poll timer and timeout for task."""
if itask.state.status not in TASK_STATUSES_ACTIVE:
# Reset, task not active
itask.timeout = None
itask.poll_timer = None
return
ctx = (itask.submit_num, itask.state.status)
if itask.poll_timer and itask.poll_timer.ctx == ctx:
return
# Set poll timer
# Set timeout
timeref = None # reference time, submitted or started time
timeout = None # timeout in setting
if itask.state.status == TASK_STATUS_RUNNING:
timeref = itask.summary['started_time']
timeout_key = 'execution timeout'
timeout = self._get_events_conf(itask, timeout_key)
delays = self.get_host_conf(
itask, 'execution polling intervals', skey='job',
default=[900]) # Default 15 minute intervals
if itask.summary[self.KEY_EXECUTE_TIME_LIMIT]:
time_limit = itask.summary[self.KEY_EXECUTE_TIME_LIMIT]
try:
host_conf = self.get_host_conf(itask, 'batch systems')
batch_sys_conf = host_conf[itask.summary['batch_sys_name']]
except (TypeError, KeyError):
batch_sys_conf = {}
time_limit_delays = batch_sys_conf.get(
'execution time limit polling intervals', [60, 120, 420])
timeout = time_limit + sum(time_limit_delays)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception catch on 908 will result in time_limit_delays getting set to None, which will throw a TypeError for the sum() method, I believe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, if batch_sys_conf is an empty dict, time_limit_delays will be set to the default value [60, 120, 420] via the 2nd argument of the .get method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, my mistake!

# Remove execessive polling before time limit
while sum(delays) > time_limit:
del delays[-1]
# But fill up the gap before time limit
if delays:
size = int((time_limit - sum(delays)) / delays[-1])
delays.extend([delays[-1]] * size)
time_limit_delays[0] += time_limit - sum(delays)
delays += time_limit_delays
else: # if itask.state.status == TASK_STATUS_SUBMITTED:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding an i.e. or similar to this comment as I went to check the nature of TASK_STATUSES_ACTIVE set to work out whether it was meant to be there thinking it e.g. might have been left in from the development stage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment here is deliberate. It is saying that the else: statement:

else:  # if itask.state.status == TASK_STATUS_SUBMITTED:

is equivalent to:

elif itask.state.status == TASK_STATUS_SUBMITTED:

The bare else: is faster because it does not have to evaluate itask.state.status == TASK_STATUS_SUBMITTED.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understood this, my point is that it wasn't immediately obvious to me that this comment was implying equivalence (hence the suggestion 'i.e.') as opposed to it being something left there during dev. Though it could easily just be me it was not at first evident to! Sorry if I was not clear.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point @sadielbartholomew but I think we use this idiom fairly often (possibly omitting the "if" in the comment though)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies, it is clearly confusion arising from my lack of experience - when I see similar comments I will now know distinctly what they mean.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No apologies necessary! (and I wouldn't too surprised if you see a few things we do that aren't clear or correct).

timeref = itask.summary['submitted_time']
timeout_key = 'submission timeout'
timeout = self._get_events_conf(itask, timeout_key)
delays = self.get_host_conf(
itask, 'submission polling intervals', skey='job',
default=[900]) # Default 15 minute intervals
try:
itask.timeout = timeref + float(timeout)
timeout_str = intvl_as_str(timeout)
except (TypeError, ValueError):
itask.timeout = None
timeout_str = None
itask.poll_timer = TaskActionTimer(ctx=ctx, delays=delays)
# Log timeout and polling schedule
message = 'health check settings: %s=%s' % (timeout_key, timeout_str)
# Attempt to group idenitical consecutive delays as N*DELAY,...
if itask.poll_timer.delays:
items = [] # [(number of item - 1, item), ...]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

940:951 is nice! A clever way to process & report the delays.

for delay in itask.poll_timer.delays:
if items and items[-1][1] == delay:
items[-1][0] += 1
else:
items.append([0, delay])
message += ', polling intervals='
for num, item in items:
if num:
message += '%d*' % (num + 1)
message += '%s,' % intvl_as_str(item)
message += '...'
LOG.info(message, itask=itask)
# Set next poll time
self.check_poll_time(itask)
Loading