Skip to content

Commit

Permalink
change batch queries for sqlite compat
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Feb 8, 2022
1 parent ad61f07 commit cd3d236
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 36 deletions.
38 changes: 20 additions & 18 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,12 +869,12 @@ def generate_ghost_task(self, tp_id, itask, is_parent=False):
if itask.submit_num > 0:
flow_db = self.schd.workflow_db_mgr.pri_dao
for row in flow_db.select_jobs_for_datastore(
{(tproxy.cycle_point, tproxy.name)}
{itask.tokens.relative_id}
):
self.insert_db_job(1, row)
else:
# Batch non-active node for load of DB history.
self.db_load_task_proxies[(point_string, name)] = (
self.db_load_task_proxies[itask.tokens.relative_id] = (
itask,
is_parent,
)
Expand Down Expand Up @@ -1044,14 +1044,18 @@ def apply_task_proxy_db_history(self):

flow_db = self.schd.workflow_db_mgr.pri_dao

cycle_name_pairs = set(self.db_load_task_proxies.keys())
task_ids = set(self.db_load_task_proxies.keys())

# Batch load rows with matching cycle & name column pairs.
prereq_tasks_args = set()
prereq_ids = set()
for (
cycle, name, flow_nums_str, status, submit_num, outputs_str
) in flow_db.select_tasks_for_datastore(cycle_name_pairs):
itask, is_parent = self.db_load_task_proxies[(cycle, name)]
) in flow_db.select_tasks_for_datastore(task_ids):
tokens = self.id_.duplicate(
cycle=cycle,
task=name,
)
itask, is_parent = self.db_load_task_proxies[tokens.relative_id]
itask.submit_num = submit_num
flow_nums = set(json.loads(flow_nums_str))
# Do not set states and outputs for future tasks in flow.
Expand All @@ -1076,15 +1080,19 @@ def apply_task_proxy_db_history(self):
for message in json.loads(outputs_str).values():
itask.state.outputs.set_completion(message, True)
# Gather tasks with flow id.
prereq_tasks_args.add((cycle, name, flow_nums_str))
prereq_ids.add(f'{tokens.relative_id}/{flow_nums_str}')

# Batch load prerequisites of tasks according to flow.
prereqs_map = {}
for (
cycle, name, prereq_name,
prereq_cycle, prereq_output, satisfied
) in flow_db.select_prereqs_for_datastore(prereq_tasks_args):
prereqs_map.setdefault((cycle, name), {})[
) in flow_db.select_prereqs_for_datastore(prereq_ids):
tokens = self.id_.duplicate(
cycle=cycle,
task=name,
)
prereqs_map.setdefault(tokens.relative_id, {})[
(prereq_cycle, prereq_name, prereq_output)
] = satisfied if satisfied != '0' else False

Expand All @@ -1096,22 +1104,16 @@ def apply_task_proxy_db_history(self):
itask_prereq.satisfied[key] = prereqs[key]

# Extract info from itasks to data-store.
for ikey, task_info in self.db_load_task_proxies.items():
cycle, name = ikey
for task_info in self.db_load_task_proxies.values():
self.process_internal_task_proxy(
task_info[0],
self.added[TASK_PROXIES][
self.id_.duplicate(
cycle=cycle,
task=name,
).id
self.id_.duplicate(task_info[0].tokens).id
]
)

flow_db.select_prereqs_for_datastore(prereq_tasks_args)

# Batch load jobs from DB.
for row in flow_db.select_jobs_for_datastore(cycle_name_pairs):
for row in flow_db.select_jobs_for_datastore(task_ids):
self.insert_db_job(1, row)

self.db_load_task_proxies.clear()
Expand Down
33 changes: 15 additions & 18 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,10 +880,10 @@ def select_task_times(self):
return columns, list(self.connect().execute(stmt))

def select_tasks_for_datastore(
self, cycle_name_pairs
self, task_ids
):
"""Select state and outputs of specified tasks."""
if not cycle_name_pairs:
if not task_ids:
return []
form_stmt = r"""
SELECT
Expand All @@ -900,26 +900,25 @@ def select_tasks_for_datastore(
ON %(task_states)s.cycle == %(task_outputs)s.cycle AND
%(task_states)s.name == %(task_outputs)s.name
WHERE
(%(task_states)s.cycle, %(task_states)s.name) IN (
VALUES %(cycle_name_pairs)s
%(task_states)s.cycle || '/' || %(task_states)s.name IN (
%(task_ids)s
)
GROUP BY
%(task_states)s.cycle, %(task_states)s.name
"""
form_data = {
"task_states": self.TABLE_TASK_STATES,
"task_outputs": self.TABLE_TASK_OUTPUTS,
"cycle_name_pairs": ', '.join(
f'{val}' for val in cycle_name_pairs),
"task_ids": ', '.join(f"'{val}'" for val in task_ids),
}
stmt = form_stmt % form_data
return list(self.connect().execute(stmt))

def select_prereqs_for_datastore(
self, prereq_tasks_args
self, prereq_ids
):
"""Select prerequisites of specified tasks."""
if not prereq_tasks_args:
if not prereq_ids:
return []
form_stmt = r"""
SELECT
Expand All @@ -932,23 +931,22 @@ def select_prereqs_for_datastore(
FROM
%(prerequisites)s
WHERE
(cycle, name, flow_nums) IN (
VALUES %(prereq_tasks_args)s
cycle || '/' || name || '/' || flow_nums IN (
%(prereq_tasks_args)s
)
"""
form_data = {
"prerequisites": self.TABLE_TASK_PREREQUISITES,
"prereq_tasks_args": ', '.join(
f'{val}' for val in prereq_tasks_args),
"prereq_tasks_args": ', '.join(f"'{val}'" for val in prereq_ids),
}
stmt = form_stmt % form_data
return list(self.connect().execute(stmt))

def select_jobs_for_datastore(
self, cycle_name_pairs
self, task_ids
):
"""Select jobs of of specified tasks."""
if not cycle_name_pairs:
if not task_ids:
return []
form_stmt = r"""
SELECT
Expand All @@ -970,17 +968,16 @@ def select_jobs_for_datastore(
%(task_jobs)s.name == %(task_states)s.name AND
%(task_jobs)s.submit_num == %(task_states)s.submit_num
WHERE
(%(task_states)s.cycle, %(task_states)s.name) IN (
VALUES %(cycle_name_pairs)s
%(task_states)s.cycle || '/' || %(task_states)s.name IN (
%(task_ids)s
)
ORDER BY
%(task_states)s.submit_num DESC
"""
form_data = {
"task_states": self.TABLE_TASK_STATES,
"task_jobs": self.TABLE_TASK_JOBS,
"cycle_name_pairs": ', '.join(
f'{val}' for val in cycle_name_pairs),
"task_ids": ', '.join(f"'{val}'" for val in task_ids),
}
stmt = form_stmt % form_data
return list(self.connect().execute(stmt))
Expand Down

0 comments on commit cd3d236

Please sign in to comment.