Skip to content

Commit

Permalink
Merge pull request #4891 from dwsutherland/job-db-load-fix
Browse files Browse the repository at this point in the history
Fix data-store DB job load
  • Loading branch information
MetRonnie authored Jun 1, 2022
2 parents 3a9fb46 + 8b03b68 commit 0c977d6
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 25 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ in `global.cylc[install]source dirs`.

### Fixes

[#4891](https://github.com/cylc/cylc-flow/pull/4891) - Fix bug that could cause
past jobs to be omitted in the UI.

[#4860](https://github.com/cylc/cylc-flow/pull/4860) - Workflow config parsing
will fail if
[owner setting](https://cylc.github.io/cylc-doc/latest/html/reference/config/workflow.html#flow.cylc[runtime][%3Cnamespace%3E][remote]owner)
Expand Down Expand Up @@ -94,6 +97,8 @@ option for `cylc install` (the functionality has been merged into the
workflow source argument), and rename the `--flow-name` option to
`--workflow-name`.

### Fixes

[#4873](https://github.com/cylc/cylc-flow/pull/4873) - `cylc show`: don't
show prerequisites of past tasks recalled from the DB as unsatisfied.

Expand Down
35 changes: 31 additions & 4 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.task_state import (
TASK_STATUS_WAITING,
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_RUNNING,
Expand Down Expand Up @@ -149,6 +150,7 @@
DELTA_FIELDS = {DELTA_ADDED, DELTA_UPDATED, DELTA_PRUNED}

JOB_STATUSES_ALL = [
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_RUNNING,
Expand Down Expand Up @@ -1191,10 +1193,35 @@ def insert_db_job(self, row_idx, row):
"""Load job element from DB post restart."""
if row_idx == 0:
LOG.info("LOADING job data")
(point_string, name, status, submit_num, time_submit, time_run,
time_run_exit, job_runner_name, job_id, platform_name) = row
if status not in JOB_STATUS_SET:
return
(
point_string,
name,
submit_num,
time_submit,
submit_status,
time_run,
time_run_exit,
run_status,
job_runner_name,
job_id,
platform_name
) = row

if run_status is not None:
if run_status == 0:
status = TASK_STATUS_SUCCEEDED
else:
status = TASK_STATUS_FAILED
elif time_run is not None:
status = TASK_STATUS_RUNNING
elif submit_status is not None:
if submit_status == 0:
status = TASK_STATUS_SUBMITTED
else:
status = TASK_STATUS_SUBMIT_FAILED
else:
status = TASK_STATUS_PREPARING

tp_id, tproxy = self.store_node_fetcher(name, point_string)
if not tproxy:
return
Expand Down
23 changes: 8 additions & 15 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,19 +639,18 @@ def select_task_job(self, cycle, name, submit_num=None):
def select_jobs_for_restart(self, callback):
"""Select from task_pool+task_states+task_jobs for restart.
Invoke callback(row_idx, row) on each row, where each row contains:
[cycle, name, status, submit_num, time_submit, time_run,
time_run_exit, job_runner_name, job_id, platform_name]
Invoke callback(row_idx, row) on each row of the result.
"""
form_stmt = r"""
SELECT
%(task_pool)s.cycle,
%(task_pool)s.name,
%(task_pool)s.status,
%(task_states)s.submit_num,
%(task_jobs)s.submit_num,
%(task_jobs)s.time_submit,
%(task_jobs)s.submit_status,
%(task_jobs)s.time_run,
%(task_jobs)s.time_run_exit,
%(task_jobs)s.run_status,
%(task_jobs)s.job_runner_name,
%(task_jobs)s.job_id,
%(task_jobs)s.platform_name
Expand All @@ -661,15 +660,9 @@ def select_jobs_for_restart(self, callback):
%(task_pool)s
ON %(task_jobs)s.cycle == %(task_pool)s.cycle AND
%(task_jobs)s.name == %(task_pool)s.name
JOIN
%(task_states)s
ON %(task_jobs)s.cycle == %(task_states)s.cycle AND
%(task_jobs)s.name == %(task_states)s.name AND
%(task_jobs)s.submit_num == %(task_states)s.submit_num
"""
form_data = {
"task_pool": self.TABLE_TASK_POOL,
"task_states": self.TABLE_TASK_STATES,
"task_jobs": self.TABLE_TASK_JOBS,
}
stmt = form_stmt % form_data
Expand Down Expand Up @@ -970,11 +963,12 @@ def select_jobs_for_datastore(
SELECT
%(task_states)s.cycle,
%(task_states)s.name,
%(task_states)s.status,
%(task_states)s.submit_num,
%(task_jobs)s.submit_num,
%(task_jobs)s.time_submit,
%(task_jobs)s.submit_status,
%(task_jobs)s.time_run,
%(task_jobs)s.time_run_exit,
%(task_jobs)s.run_status,
%(task_jobs)s.job_runner_name,
%(task_jobs)s.job_id,
%(task_jobs)s.platform_name
Expand All @@ -983,8 +977,7 @@ def select_jobs_for_datastore(
JOIN
%(task_states)s
ON %(task_jobs)s.cycle == %(task_states)s.cycle AND
%(task_jobs)s.name == %(task_states)s.name AND
%(task_jobs)s.submit_num == %(task_states)s.submit_num
%(task_jobs)s.name == %(task_states)s.name
WHERE
%(task_states)s.cycle || '/' || %(task_states)s.name IN (
%(task_ids)s
Expand Down
57 changes: 53 additions & 4 deletions tests/functional/restart/03-retrying.t
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#-------------------------------------------------------------------------------
# Test restarting with a task waiting to retry (was retrying state).
. "$(dirname "$0")/test_header"
set_test_number 5
set_test_number 8
init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
[scheduler]
[[events]]
Expand All @@ -32,14 +32,17 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
[[t1]]
script = """
cylc__job__wait_cylc_message_started
if ((CYLC_TASK_TRY_NUMBER == 1)); then
if ((CYLC_TASK_TRY_NUMBER < 3)); then
exit 1
elif ((CYLC_TASK_TRY_NUMBER == 3)); then
cylc stop "${CYLC_WORKFLOW_ID}"
exit 1
fi
"""
[[[job]]]
execution retry delays = PT0S
execution retry delays = 3*PT0S
__FLOW_CONFIG__

#-------------------------------------------------------------------------------
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
workflow_run_ok "${TEST_NAME_BASE}-run" \
Expand All @@ -48,8 +51,54 @@ sqlite3 "${WORKFLOW_RUN_DIR}/log/db" 'SELECT cycle, name, status FROM task_pool'
cmp_ok 'sqlite3.out' <<'__DB_DUMP__'
1|t1|waiting
__DB_DUMP__
workflow_run_ok "${TEST_NAME_BASE}-restart" \


workflow_run_ok "${TEST_NAME_BASE}-restart-pause" \
cylc play --debug --pause "${WORKFLOW_NAME}"

# query jobs
TEST_NAME="${TEST_NAME_BASE}-jobs-query"

read -r -d '' jobsQuery <<_args_
{
"request_string": "
query {
jobs (sort: {keys: [\"submitNum\"]}) {
state
submitNum
}
}",
"variables": null
}
_args_

run_graphql_ok "${TEST_NAME}" "${WORKFLOW_NAME}" "${jobsQuery}"

cmp_json "${TEST_NAME}-out" "${TEST_NAME_BASE}-jobs-query.stdout" << __HERE__
{
"jobs": [
{
"state": "failed",
"submitNum": 1
},
{
"state": "failed",
"submitNum": 2
},
{
"state": "failed",
"submitNum": 3
}
]
}
__HERE__

# stop workflow
cylc stop --max-polls=10 --interval=2 "${WORKFLOW_NAME}"

workflow_run_ok "${TEST_NAME_BASE}-restart-resume" \
cylc play --debug --no-detach "${WORKFLOW_NAME}"

sqlite3 "${WORKFLOW_RUN_DIR}/log/db" 'SELECT * FROM task_pool' >'sqlite3.out'
cmp_ok 'sqlite3.out' </dev/null
#-------------------------------------------------------------------------------
Expand Down
5 changes: 3 additions & 2 deletions tests/integration/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ def job_db_row():
return [
'1',
'foo',
'running',
4,
'2020-04-03T13:40:18+13:00',
0,
'2020-04-03T13:40:20+13:00',
'2020-04-03T13:40:30+13:00',
None,
None,
'background',
'20542',
'localhost',
Expand Down

0 comments on commit 0c977d6

Please sign in to comment.