Skip to content

Commit

Permalink
cylc.rundb: improve commit/rollback logic
Browse files Browse the repository at this point in the history
Commit and delete all queued items only on success. Roll back if
anything dies, and retry all queued items next time. This ensures
statements are never missed on retry.
Print statement diagnostics on DB connect error for write.
  • Loading branch information
matthewrmshin committed Apr 14, 2016
1 parent 3fb3f34 commit adcabba
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 28 deletions.
60 changes: 33 additions & 27 deletions lib/cylc/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,73 +320,79 @@ def execute_queued_items(self):
# DELETE statements may have varying number of WHERE args so we
# can only executemany for each identical template statement.
for stmt, stmt_args_list in table.delete_queues.items():
self._execute_stmt(table, stmt, stmt_args_list)
table.delete_queues.pop(stmt)
self._execute_stmt(stmt, stmt_args_list)
# INSERT statements are uniform for each table, so all INSERT
# statements can be executed using a single "executemany" call.
if table.insert_queue:
self._execute_stmt(
table, table.get_insert_stmt(), table.insert_queue)
table.insert_queue = []
table.get_insert_stmt(), table.insert_queue)
# UPDATE statements can have varying number of SET and WHERE
# args so we can only executemany for each identical template
# statement.
for stmt, stmt_args_list in table.update_queues.items():
self._execute_stmt(table, stmt, stmt_args_list)
table.update_queues.pop(stmt)
if self.conn is not None:
self.conn.commit()
self._execute_stmt(stmt, stmt_args_list)
# Connection should only be opened if we have executed something.
if self.conn is None:
return
self.conn.commit()
except sqlite3.Error:
if not self.is_public:
raise
self.conn.rollback()
if cylc.flags.debug:
traceback.print_exc()
sys.stderr.write(
"WARNING: %s: db write failed\n" % self.db_file_name)
self.n_tries += 1
logger = getLogger("main")
logger.log(
WARNING,
"%(file)s: write attempt (%(attempt)d) did not complete\n" % {
"file": self.db_file_name, "attempt": self.n_tries})
if self.conn is not None:
try:
self.conn.rollback()
except sqlite3.Error:
pass
return
else:
# Clear the queues
for table in self.tables.values():
table.delete_queues.clear()
del table.insert_queue[:] # list.clear avail from Python 3.3
table.update_queues.clear()
# Report public database retry recovery if necessary
if self.n_tries:
logger = getLogger("main")
logger.log(
WARNING,
"%(file)s: recovered after (%(attempt)d) attempt(s)\n" % {
"file": self.db_file_name, "attempt": self.n_tries})
self.n_tries = 0
finally:
# Note: This is not strictly necessary. However, if the suite run
# directory is removed, a forced reconnection to the private
# database will ensure that the suite dies.
self.close()

# N.B. This is not strictly necessary. However, if the suite run
# directory is removed, a forced reconnection to the private database
# will ensure that the suite dies.
self.close()

def _execute_stmt(self, table, stmt, stmt_args_list):
def _execute_stmt(self, stmt, stmt_args_list):
"""Helper for "self.execute_queued_items".
Execute a statement. If this is the public database, return True on
success and False on failure. If this is the private database, return
True on success, and raise on failure.
"""
self.connect()
try:
self.connect()
self.conn.executemany(stmt, stmt_args_list)
except sqlite3.Error:
if not self.is_public:
raise
if cylc.flags.debug:
traceback.print_exc()
sys.stderr.write(
"WARNING: %(file)s: %(table)s: %(stmt)s\n" % {
"file": self.db_file_name,
"table": table.name,
"stmt": stmt})
for stmt_args in stmt_args_list:
sys.stderr.write("\t%(stmt_args)s\n" % {
"stmt_args": stmt_args})
(
"WARNING: cannot execute database statement:\n" +
"\tfile=%(file)s:\n\tstmt=%(stmt)s\n"
) % {"file": self.db_file_name, "stmt": stmt})
for i, stmt_args in enumerate(stmt_args_list):
sys.stderr.write("\tstmt_args[%(i)d]=%(stmt_args)s\n" % {
"i": i, "stmt_args": stmt_args})
raise

def select_all_task_jobs(self, keys):
Expand Down
18 changes: 17 additions & 1 deletion tests/database/04-lock-recover.t
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,29 @@
#-------------------------------------------------------------------------------
# Suite database content, "task_jobs" table after a task retries.
. "$(dirname "$0")/test_header"
set_test_number 3
set_test_number 4
install_suite "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"

run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}"
suite_run_ok "${TEST_NAME_BASE}-run" \
cylc run --debug --reference-test "${SUITE_NAME}"

# Ensure that DB statement and its args are printed to STDERR
grep -A 3 -F 'WARNING: cannot execute database statement:' \
"${TEST_NAME_BASE}-run.stderr" > "${TEST_NAME_BASE}-run.stderr.grep"
# The following "sed" turns the value for "time_submit_exit" to "?"
sed -i "s/=\\['[^ ]*Z',/=['?',/" \
"${TEST_NAME_BASE}-run.stderr.grep"
SUITE_RUN_DIR="$(cylc get-global-config --print-run-dir)/${SUITE_NAME}"
JOB_PID="$(awk -F'=' '$1 == "CYLC_JOB_PID" {print $2}' \
"${SUITE_RUN_DIR}/log/job/1/locker/01/job.status")" >&2
cmp_ok "${TEST_NAME_BASE}-run.stderr.grep" <<__ERR__
WARNING: cannot execute database statement:
file=${SUITE_RUN_DIR}/cylc-suite.db:
stmt=UPDATE task_jobs SET time_submit_exit=?, submit_status=?, batch_sys_job_id=? WHERE cycle==? AND name==? AND submit_num==?
stmt_args[0]=['?', 0, '${JOB_PID}', '1', 'locker', 1]
__ERR__

DB_FILE="$(cylc get-global-config '--print-run-dir')/${SUITE_NAME}/cylc-suite.db"

NAME='select-task-states.out'
Expand Down

0 comments on commit adcabba

Please sign in to comment.