Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove checkpointing #3906

Merged
merged 7 commits into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ Cylc Review was also removed in this version.
Cylc 7 suites cannot be restarted in Cylc 8 using `cylc restart`, but they
can still be run using `cylc run` ([#3863](https://github.com/cylc/cylc-flow/pull/3863)).

Named checkpoints have been removed ([#3906](https://github.com/cylc/cylc-flow/pull/3906))
due to being a seldom-used feature. Workflows can still be restarted from the
last run, or reflow can be used to achieve the same result.

-------------------------------------------------------------------------------
## __cylc-8.0a3 (2020-08?)__

Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/etc/cylc-bash-completion
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ _cylc() {
cur="${COMP_WORDS[COMP_CWORD]}"
sec="${COMP_WORDS[1]}"
opts="$(cylc scan -t name 2>/dev/null)"
suite_cmds="broadcast|bcast|cat-log|log|check-versions|checkpoint|diff|compare|dump|edit|ext-trigger|external-trigger|get-directory|get-suite-config|get-config|get-suite-version|get-cylc-version|graph|graph-diff|hold|insert|kill|list|ls|ls-checkpoints|tui|ping|poll|print|register|release|unhold|reload|remove|report-timings|reset|restart|run|start|scan|search|grep|set-verbosity|show|set-outputs|stop|shutdown|single|suite-state|test-battery|trigger|validate|view|warranty"
suite_cmds="broadcast|bcast|cat-log|log|check-versions|diff|compare|dump|edit|ext-trigger|external-trigger|get-directory|get-suite-config|get-config|get-suite-version|get-cylc-version|graph|graph-diff|hold|insert|kill|list|ls|tui|ping|poll|print|register|release|unhold|reload|remove|report-timings|reset|restart|run|start|scan|search|grep|set-verbosity|show|set-outputs|stop|shutdown|single|suite-state|test-battery|trigger|validate|view|warranty"


if [[ ${COMP_CWORD} -eq 1 ]]; then
Expand Down
18 changes: 0 additions & 18 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,24 +756,6 @@ def stop(
))
return (True, 'Command queued')

def take_checkpoints(self, name):
"""Checkpoint current task pool.

Args:
name (str): The checkpoint name

Returns:
tuple: (outcome, message)

outcome (bool)
True if command successfully queued.
message (str)
Information about outcome.

"""
self.schd.command_queue.put(("take_checkpoints", (name,), {}))
return (True, 'Command queued')

def force_trigger_tasks(self, tasks, reflow=False):
"""Trigger submission of task jobs where possible.

Expand Down
17 changes: 0 additions & 17 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1551,21 +1551,6 @@ class Arguments:
result = GenericScalar()


class Checkpoint(Mutation):
class Meta:
description = 'Tell the suite to checkpoint its current state.'
resolver = partial(mutator, command='take_checkpoints')

class Arguments:
workflows = List(WorkflowID, required=True)
name = String(
description='The checkpoint name.',
required=True
)

result = GenericScalar()


class ExtTrigger(Mutation):
class Meta:
description = sstrip('''
Expand Down Expand Up @@ -1703,8 +1688,6 @@ class Mutations(ObjectType):
set_verbosity = SetVerbosity.Field(
description=SetVerbosity._meta.description)
stop = Stop.Field(description=Stop._meta.description)
checkpoint = Checkpoint.Field(
description=Checkpoint._meta.description)

# task actions
kill = Kill.Field(description=Kill._meta.description)
Expand Down
186 changes: 29 additions & 157 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

from cylc.flow import LOG
import cylc.flow.flags
from cylc.flow.wallclock import get_current_time_string


class CylcSuiteDAOTableColumn:
Expand Down Expand Up @@ -169,23 +168,17 @@ class CylcSuiteDAO:
DB_FILE_BASE_NAME = "db"
MAX_TRIES = 100
RESTART_INCOMPAT_VERSION = "8.0a2" # Can't restart suite if <= this vers
CHECKPOINT_LATEST_ID = 0
CHECKPOINT_LATEST_EVENT = "latest"
TABLE_BROADCAST_EVENTS = "broadcast_events"
TABLE_BROADCAST_STATES = "broadcast_states"
TABLE_BROADCAST_STATES_CHECKPOINTS = "broadcast_states_checkpoints"
TABLE_INHERITANCE = "inheritance"
TABLE_SUITE_PARAMS = "suite_params"
TABLE_SUITE_PARAMS_CHECKPOINTS = "suite_params_checkpoints"
TABLE_SUITE_TEMPLATE_VARS = "suite_template_vars"
TABLE_TASK_JOBS = "task_jobs"
TABLE_TASK_EVENTS = "task_events"
TABLE_TASK_ACTION_TIMERS = "task_action_timers"
TABLE_CHECKPOINT_ID = "checkpoint_id"
TABLE_TASK_LATE_FLAGS = "task_late_flags"
TABLE_TASK_OUTPUTS = "task_outputs"
TABLE_TASK_POOL = "task_pool"
TABLE_TASK_POOL_CHECKPOINTS = "task_pool_checkpoints"
TABLE_TASK_PREREQUISITES = "task_prerequisites"
TABLE_TASK_STATES = "task_states"
TABLE_TASK_TIMEOUT_TIMERS = "task_timeout_timers"
Expand All @@ -207,18 +200,6 @@ class CylcSuiteDAO:
["key", {"is_primary_key": True}],
["value"],
],
TABLE_BROADCAST_STATES_CHECKPOINTS: [
["id", {"datatype": "INTEGER", "is_primary_key": True}],
["point", {"is_primary_key": True}],
["namespace", {"is_primary_key": True}],
["key", {"is_primary_key": True}],
["value"],
],
TABLE_CHECKPOINT_ID: [
["id", {"datatype": "INTEGER", "is_primary_key": True}],
["time"],
["event"],
],
TABLE_INHERITANCE: [
["namespace", {"is_primary_key": True}],
["inheritance"],
Expand All @@ -227,11 +208,6 @@ class CylcSuiteDAO:
["key", {"is_primary_key": True}],
["value"],
],
TABLE_SUITE_PARAMS_CHECKPOINTS: [
["id", {"datatype": "INTEGER", "is_primary_key": True}],
["key", {"is_primary_key": True}],
["value"],
],
TABLE_SUITE_TEMPLATE_VARS: [
["key", {"is_primary_key": True}],
["value"],
Expand Down Expand Up @@ -300,14 +276,6 @@ class CylcSuiteDAO:
["signature", {"is_primary_key": True}],
["results"],
],
TABLE_TASK_POOL_CHECKPOINTS: [
["id", {"datatype": "INTEGER", "is_primary_key": True}],
["cycle", {"is_primary_key": True}],
["name", {"is_primary_key": True}],
["flow_label", {"is_primary_key": True}],
["status"],
["is_held", {"datatype": "INTEGER"}],
],
TABLE_TASK_STATES: [
["name", {"is_primary_key": True}],
["cycle", {"is_primary_key": True}],
Expand Down Expand Up @@ -487,78 +455,44 @@ def _execute_stmt(self, stmt, stmt_args_list):
LOG.warning(err_log)
raise

def pre_select_broadcast_states(self, id_key=None, order=None):
def pre_select_broadcast_states(self, order=None):
"""Query statement and args formation for select_broadcast_states."""
form_stmt = r"SELECT point,namespace,key,value FROM %s"
if order == "ASC":
ordering = " ORDER BY point ASC, namespace ASC, key ASC"
form_stmt = form_stmt + ordering
if id_key is None or id_key == self.CHECKPOINT_LATEST_ID:
return form_stmt % self.TABLE_BROADCAST_STATES, []
else:
return (form_stmt % self.TABLE_BROADCAST_STATES_CHECKPOINTS +
r" WHERE id==?"), [id_key]
return form_stmt % self.TABLE_BROADCAST_STATES

def select_broadcast_states(self, callback, id_key=None, sort=None):
"""Select from broadcast_states or broadcast_states_checkpoints.
def select_broadcast_states(self, callback, sort=None):
"""Select from broadcast_states.

Invoke callback(row_idx, row) on each row, where each row contains:
[point, namespace, key, value]

If id_key is specified,
select from broadcast_states table if id_key == CHECKPOINT_LATEST_ID.
Otherwise select from broadcast_states_checkpoints where id == id_key.
"""
stmt, stmt_args = self.pre_select_broadcast_states(id_key=id_key,
order=sort)
for row_idx, row in enumerate(self.connect().execute(stmt, stmt_args)):
stmt = self.pre_select_broadcast_states(order=sort)
for row_idx, row in enumerate(self.connect().execute(stmt)):
callback(row_idx, list(row))

def select_checkpoint_id(self, callback, id_key=None):
"""Select from checkpoint_id.
def select_suite_params(self, callback):
"""Select from suite_params.

Invoke callback(row_idx, row) on each row, where each row contains:
[id, time, event]
[key, value]

If id_key is specified, add where id == id_key to select.
E.g. a row might be ['UTC mode', '1']
"""
stmt = r"SELECT id,time,event FROM checkpoint_id"
stmt_args = []
if id_key is not None:
stmt += r" WHERE id==?"
stmt_args.append(id_key)
stmt += r" ORDER BY time ASC"
for row_idx, row in enumerate(self.connect().execute(stmt, stmt_args)):
stmt = f"SELECT key, value FROM {self.TABLE_SUITE_PARAMS}"
for row_idx, row in enumerate(self.connect().execute(stmt)):
callback(row_idx, list(row))

def select_checkpoint_id_restart_count(self):
"""Return number of restart event in checkpoint_id table."""
stmt = r"SELECT COUNT(event) FROM checkpoint_id WHERE event==?"
stmt_args = ['restart']
for row in self.connect().execute(stmt, stmt_args):
return row[0]
return 0

def select_suite_params(self, callback, id_key=None):
"""Select from suite_params or suite_params_checkpoints.

Invoke callback(row_idx, row) on each row, where each row contains:
[key,value]

If id_key is specified,
select from suite_params table if id_key == CHECKPOINT_LATEST_ID.
Otherwise select from suite_params_checkpoints where id == id_key.
def select_suite_params_restart_count(self):
"""Return number of restarts in suite_params table."""
stmt = f"""
SELECT value FROM {self.TABLE_SUITE_PARAMS}
WHERE key == 'n_restart';
"""
form_stmt = r"SELECT key,value FROM %s"
if id_key is None or id_key == self.CHECKPOINT_LATEST_ID:
stmt = form_stmt % self.TABLE_SUITE_PARAMS
stmt_args = []
else:
stmt = (form_stmt % self.TABLE_SUITE_PARAMS_CHECKPOINTS +
r" WHERE id==?")
stmt_args = [id_key]
for row_idx, row in enumerate(self.connect().execute(stmt, stmt_args)):
callback(row_idx, list(row))
result = self.connect().execute(stmt).fetchone()
return int(result[0]) if result else 0

def select_suite_template_vars(self, callback):
"""Select from suite_template_vars.
Expand Down Expand Up @@ -614,16 +548,12 @@ def select_task_job(self, cycle, name, submit_num=None):
except sqlite3.DatabaseError:
return None

def select_job_pool_for_restart(self, callback, id_key=None):
def select_job_pool_for_restart(self, callback):
"""Select from task_pool+task_states+task_jobs for restart.

Invoke callback(row_idx, row) on each row, where each row contains:
[cycle, name, status, submit_num, time_submit, time_run,
time_run_exit, batch_sys_name, batch_sys_job_id, platform_name]

If id_key is specified,
select from task_pool table if id_key == CHECKPOINT_LATEST_ID.
Otherwise select from task_pool_checkpoints where id == id_key.
"""
form_stmt = r"""
SELECT
Expand Down Expand Up @@ -654,14 +584,8 @@ def select_job_pool_for_restart(self, callback, id_key=None):
"task_states": self.TABLE_TASK_STATES,
"task_jobs": self.TABLE_TASK_JOBS,
}
if id_key is None or id_key == self.CHECKPOINT_LATEST_ID:
stmt = form_stmt % form_data
stmt_args = []
else:
form_data["task_pool"] = self.TABLE_TASK_POOL_CHECKPOINTS
stmt = (form_stmt + r" WHERE %(task_pool)s.id==?") % form_data
stmt_args = [id_key]
for row_idx, row in enumerate(self.connect().execute(stmt, stmt_args)):
stmt = form_stmt % form_data
for row_idx, row in enumerate(self.connect().execute(stmt)):
callback(row_idx, list(row))

def select_task_job_run_times(self, callback):
Expand Down Expand Up @@ -722,37 +646,23 @@ def select_abs_outputs_for_restart(self, callback):
for row_idx, row in enumerate(self.connect().execute(stm, [])):
callback(row_idx, list(row))

def select_task_pool(self, callback, id_key=None):
"""Select from task_pool or task_pool_checkpoints.
def select_task_pool(self, callback):
"""Select from task_pool.

Invoke callback(row_idx, row) on each row, where each row contains:
[cycle, name, status]

If id_key is specified,
select from task_pool table if id_key == CHECKPOINT_LATEST_ID.
Otherwise select from task_pool_checkpoints where id == id_key.
"""
form_stmt = r"SELECT cycle,name,status,is_held FROM %s"
if id_key is None or id_key == self.CHECKPOINT_LATEST_ID:
stmt = form_stmt % self.TABLE_TASK_POOL
stmt_args = []
else:
stmt = (
form_stmt % self.TABLE_TASK_POOL_CHECKPOINTS + r" WHERE id==?")
stmt_args = [id_key]
for row_idx, row in enumerate(self.connect().execute(stmt, stmt_args)):
stmt = form_stmt % self.TABLE_TASK_POOL
for row_idx, row in enumerate(self.connect().execute(stmt)):
callback(row_idx, list(row))

def select_task_pool_for_restart(self, callback, id_key=None):
def select_task_pool_for_restart(self, callback):
"""Select from task_pool+task_states+task_jobs for restart.

Invoke callback(row_idx, row) on each row, where each row contains:
[cycle, name, is_late, status, is_held, submit_num,
try_num, platform_name, time_submit, time_run, timeout, outputs]

If id_key is specified,
select from task_pool table if id_key == CHECKPOINT_LATEST_ID.
Otherwise select from task_pool_checkpoints where id == id_key.
"""
form_stmt = r"""
SELECT
Expand Down Expand Up @@ -802,14 +712,8 @@ def select_task_pool_for_restart(self, callback, id_key=None):
"task_jobs": self.TABLE_TASK_JOBS,
"task_outputs": self.TABLE_TASK_OUTPUTS,
}
if id_key is None or id_key == self.CHECKPOINT_LATEST_ID:
stmt = form_stmt % form_data
stmt_args = []
else:
form_data["task_pool"] = self.TABLE_TASK_POOL_CHECKPOINTS
stmt = (form_stmt + r" WHERE %(task_pool)s.id==?") % form_data
stmt_args = [id_key]
for row_idx, row in enumerate(self.connect().execute(stmt, stmt_args)):
stmt = form_stmt % form_data
for row_idx, row in enumerate(self.connect().execute(stmt)):
callback(row_idx, list(row))

def select_task_prerequisites(self, cycle, name):
Expand Down Expand Up @@ -857,38 +761,6 @@ def select_task_times(self):
)
return columns, [r for r in self.connect().execute(q)]

def take_checkpoints(self, event, other_daos=None):
"""Add insert items to *_checkpoints tables.

Select items in suite_params, broadcast_states and task_pool and
prepare them for insert into the relevant *_checkpoints tables, and
prepare an insert into the checkpoint_id table the event and the
current time.

If other_daos is a specified, it should be a list of CylcSuiteDAO
objects. The logic will prepare insertion of the same items into the
*_checkpoints tables of these DAOs as well.
"""
id_ = 1
for max_id, in self.connect().execute(
"SELECT MAX(id) FROM checkpoint_id"):
if max_id is not None and max_id >= id_:
id_ = max_id + 1
daos = [self]
if other_daos:
daos.extend(other_daos)
for dao in daos:
dao.tables[self.TABLE_CHECKPOINT_ID].add_insert_item([
id_, get_current_time_string(), event])
for table_name in [
self.TABLE_SUITE_PARAMS,
self.TABLE_BROADCAST_STATES,
self.TABLE_TASK_POOL]:
for row in self.connect().execute("SELECT * FROM %s" % table_name):
for dao in daos:
dao.tables[table_name + "_checkpoints"].add_insert_item(
[id_] + list(row))

def vacuum(self):
"""Vacuum to the database."""
return self.connect().execute("VACUUM")
Expand Down
Loading