Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xtrigger efficiency fix. #5908

Merged
merged 7 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5908.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed bug causing redundant DB method calls when many tasks depend on the same xtrigger.
32 changes: 9 additions & 23 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ async def initialise(self):
self.workflow,
user=self.owner,
broadcast_mgr=self.broadcast_mgr,
workflow_db_mgr=self.workflow_db_mgr,
data_store_mgr=self.data_store_mgr,
proc_pool=self.proc_pool,
workflow_run_dir=self.workflow_run_dir,
Expand Down Expand Up @@ -1705,14 +1706,8 @@ async def main_loop(self) -> None:
await self.process_command_queue()
self.proc_pool.process()

# Tasks in the main pool that are waiting but not queued must be
# waiting on external dependencies, i.e. xtriggers or ext_triggers.
# For these tasks, call any unsatisfied xtrigger functions, and
# queue tasks that have become ready. (Tasks do not appear in the
# main pool at all until all other-task deps are satisfied, and are
# queued immediately on release from runahead limiting if they are
# not waiting on external deps).
housekeep_xtriggers = False
# Unqueued tasks with satisfied prerequisites must be waiting on
# xtriggers or ext_triggers. Check these and queue tasks if ready.
for itask in self.pool.get_tasks():
if (
not itask.state(TASK_STATUS_WAITING)
Expand All @@ -1725,28 +1720,19 @@ async def main_loop(self) -> None:
itask.state.xtriggers
and not itask.state.xtriggers_all_satisfied()
):
# Call unsatisfied xtriggers if not already in-process.
# Results are returned asynchronously.
self.xtrigger_mgr.call_xtriggers_async(itask)
# Check for satisfied xtriggers, and queue if ready.
if self.xtrigger_mgr.check_xtriggers(
itask, self.workflow_db_mgr.put_xtriggers):
housekeep_xtriggers = True
if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)

# Check for satisfied ext_triggers, and queue if ready.

if (
itask.state.external_triggers
and not itask.state.external_triggers_all_satisfied()
and self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)
and all(itask.is_ready_to_run())
):
self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)

if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)

if housekeep_xtriggers:
# (Could do this periodically?)
if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

self.pool.set_expired_tasks()
Expand Down
4 changes: 4 additions & 0 deletions cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,15 @@ def process_queued_ops(self) -> None:
self.db_deletes_map.items()):
while db_deletes:
where_args = db_deletes.pop(0)
# LOG.debug(f"DB {table_name} DEL {where_args}")
self.pri_dao.add_delete_item(table_name, where_args)
self.pub_dao.add_delete_item(table_name, where_args)
if any(self.db_inserts_map.values()):
for table_name, db_inserts in sorted(
self.db_inserts_map.items()):
while db_inserts:
db_insert = db_inserts.pop(0)
# LOG.debug(f"DB {table_name} INS {db_insert}")
self.pri_dao.add_insert_item(table_name, db_insert)
self.pub_dao.add_insert_item(table_name, db_insert)
if (hasattr(self, 'db_updates_map') and
Expand All @@ -248,6 +250,8 @@ def process_queued_ops(self) -> None:
self.db_updates_map.items()):
while db_updates:
set_args, where_args = db_updates.pop(0)
# LOG.debug(
# f"DB {table_name} UPD {set_args} WHERE {where_args}")
self.pri_dao.add_update_item(
table_name, set_args, where_args)
self.pub_dao.add_update_item(
Expand Down
82 changes: 45 additions & 37 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@
import re
from copy import deepcopy
from time import time
from typing import Any, Dict, List, Optional, Tuple, Callable
from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING

from cylc.flow import LOG
from cylc.flow.exceptions import XtriggerConfigError
import cylc.flow.flags
from cylc.flow.hostuserutil import get_user
from cylc.flow.subprocpool import get_func
from cylc.flow.xtriggers.wall_clock import wall_clock

from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.subprocpool import SubProcPool
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.subprocpool import get_func
if TYPE_CHECKING:
from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.subprocpool import SubProcPool
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager


class TemplateVariables(Enum):
Expand Down Expand Up @@ -185,6 +187,7 @@
Args:
workflow: workflow name
user: workflow owner
workflow_db_mgr: the DB Manager
broadcast_mgr: the Broadcast Manager
proc_pool: pool of Subprocesses
workflow_run_dir: workflow run directory
Expand All @@ -195,9 +198,10 @@
def __init__(
self,
workflow: str,
broadcast_mgr: BroadcastMgr,
data_store_mgr: DataStoreMgr,
proc_pool: SubProcPool,
broadcast_mgr: 'BroadcastMgr',
workflow_db_mgr: 'WorkflowDatabaseManager',
data_store_mgr: 'DataStoreMgr',
proc_pool: 'SubProcPool',
user: Optional[str] = None,
workflow_run_dir: Optional[str] = None,
workflow_share_dir: Optional[str] = None,
Expand Down Expand Up @@ -230,11 +234,15 @@
}

self.proc_pool = proc_pool
self.workflow_db_mgr = workflow_db_mgr
self.broadcast_mgr = broadcast_mgr
self.data_store_mgr = data_store_mgr
self.do_housekeeping = False

@staticmethod
def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None:
def validate_xtrigger(
label: str, fctx: 'SubFuncContext', fdir: str
) -> None:
"""Validate an Xtrigger function.

Args:
Expand Down Expand Up @@ -305,7 +313,7 @@
f' {", ".join(t.value for t in deprecated_variables)}'
)

def add_trig(self, label: str, fctx: SubFuncContext, fdir: str) -> None:
def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None:
"""Add a new xtrigger function.

Check the xtrigger function exists here (e.g. during validation).
Expand Down Expand Up @@ -334,7 +342,7 @@
sig, results = row
self.sat_xtrig[sig] = json.loads(results)

def _get_xtrigs(self, itask: TaskProxy, unsat_only: bool = False,
def _get_xtrigs(self, itask: 'TaskProxy', unsat_only: bool = False,
sigs_only: bool = False):
"""(Internal helper method.)

Expand All @@ -361,7 +369,9 @@
res.append((label, sig, ctx, satisfied))
return res

def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext:
def get_xtrig_ctx(
self, itask: 'TaskProxy', label: str
) -> 'SubFuncContext':
"""Get a real function context from the template.

Args:
Expand Down Expand Up @@ -412,7 +422,7 @@
ctx.update_command(self.workflow_run_dir)
return ctx

def call_xtriggers_async(self, itask: TaskProxy):
def call_xtriggers_async(self, itask: 'TaskProxy'):
"""Call itask's xtrigger functions via the process pool...

...if previous call not still in-process and retry period is up.
Expand All @@ -421,16 +431,23 @@
itask: task proxy to check.
"""
for label, sig, ctx, _ in self._get_xtrigs(itask, unsat_only=True):
# Special case: quick synchronous clock check:
if sig.startswith("wall_clock"):
# Special case: quick synchronous clock check.
if wall_clock(*ctx.func_args, **ctx.func_kwargs):
if sig in self.sat_xtrig:
# Already satisfied, just update the task
itask.state.xtriggers[label] = True

Check warning on line 438 in cylc/flow/xtrigger_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtrigger_mgr.py#L438

Added line #L438 was not covered by tests
elif wall_clock(*ctx.func_args, **ctx.func_kwargs):
# Newly satisfied
itask.state.xtriggers[label] = True
self.sat_xtrig[sig] = {}
self.data_store_mgr.delta_task_xtrigger(sig, True)
self.workflow_db_mgr.put_xtriggers({sig: {}})
LOG.info('xtrigger satisfied: %s = %s', label, sig)
self.do_housekeeping = True
continue
# General case: potentially slow asynchronous function call.
if sig in self.sat_xtrig:
# Already satisfied, just update the task
if not itask.state.xtriggers[label]:
itask.state.xtriggers[label] = True
res = {}
Expand All @@ -445,6 +462,8 @@
xtrigger_env
)
continue

# Call the function to check the unsatisfied xtrigger.
if sig in self.active:
# Already waiting on this result.
continue
Expand All @@ -457,8 +476,10 @@
self.active.append(sig)
self.proc_pool.put_command(ctx, callback=self.callback)

def housekeep(self, itasks: List[TaskProxy]):
"""Delete satisfied xtriggers no longer needed by any task.
def housekeep(self, itasks):
"""Forget satisfied xtriggers no longer needed by any task.

Check self.do_housekeeping before calling this method.

Args:
itasks: list of all task proxies.
Expand All @@ -469,8 +490,9 @@
for sig in list(self.sat_xtrig):
if sig not in all_xtrig:
del self.sat_xtrig[sig]
self.do_housekeeping = False

def callback(self, ctx: SubFuncContext):
def callback(self, ctx: 'SubFuncContext'):
"""Callback for asynchronous xtrigger functions.

Record satisfaction status and function results dict.
Expand All @@ -489,23 +511,9 @@
return
LOG.debug('%s: returned %s', sig, results)
if satisfied:
# Newly satisfied
self.data_store_mgr.delta_task_xtrigger(sig, True)
self.workflow_db_mgr.put_xtriggers({sig: results})
LOG.info('xtrigger satisfied: %s = %s', ctx.label, sig)
self.sat_xtrig[sig] = results

def check_xtriggers(
self,
itask: TaskProxy,
db_update_func: Callable[[dict], None]) -> bool:
"""Check if all of itasks' xtriggers have become satisfied.

Return True if satisfied, else False

Args:
itasks: task proxies to check
db_update_func: method to update xtriggers in the DB
"""
if itask.state.xtriggers_all_satisfied():
db_update_func(self.sat_xtrig)
return True
return False
self.do_housekeeping = True
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ tests =
pytest-cov>=2.8.0
pytest-xdist>=2
pytest-env>=0.6.2
pytest-mock>=3.7
pytest>=6
testfixtures>=6.11.0
towncrier>=23
Expand Down
1 change: 1 addition & 0 deletions tests/functional/xtriggers/02-persistence/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
[scheduling]
initial cycle point = 2010
final cycle point = 2011
runahead limit = P0
[[xtriggers]]
x1 = faker(name="bob")
[[graph]]
Expand Down
54 changes: 54 additions & 0 deletions tests/integration/test_xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"""Tests for the behaviour of xtrigger manager.
"""

from pytest_mock import mocker

async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
"""Test that if an itask has 2 wall_clock triggers with different
Expand Down Expand Up @@ -65,3 +66,56 @@ async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
'clock_2': False,
'clock_3': False,
}


async def test_1_xtrigger_2_tasks(flow, start, scheduler, monkeypatch, mocker):
"""
If multiple tasks depend on the same satisfied xtrigger, the DB mgr method
put_xtriggers should only be called once - when the xtrigger gets satisfied.

See GitHub #5908

"""
task_point = 1588636800 # 2020-05-05
ten_years_ahead = 1904169600 # 2030-05-05
monkeypatch.setattr(
'cylc.flow.xtriggers.wall_clock.time',
lambda: ten_years_ahead - 1
)
id_ = flow({
'scheduler': {
'allow implicit tasks': True
},
'scheduling': {
'initial cycle point': '2020-05-05',
'xtriggers': {
'clock_1': 'wall_clock()',
},
'graph': {
'R1': '@clock_1 => foo & bar'
}
}
})

schd = scheduler(id_)
spy = mocker.spy(schd.workflow_db_mgr, 'put_xtriggers')

async with start(schd):

# Call the clock trigger via its dependent tasks, to get it satisfied.
for task in schd.pool.get_tasks():
# (For clock triggers this is synchronous)
schd.xtrigger_mgr.call_xtriggers_async(task)

# It should now be satisfied.
assert task.state.xtriggers == {'clock_1': True}

# Check one put_xtriggers call only, not two.
assert spy.call_count == 1

# Note on master prior to GH #5908 the call is made from the
# scheduler main loop when the two tasks become satisified,
# resulting in two calls to put_xtriggers. This test fails
# on master, but with call count 0 (not 2) because the main
# loop doesn't run in this test.

1 change: 1 addition & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def xtrigger_mgr() -> XtriggerManager:
workflow=workflow_name,
user=user,
proc_pool=Mock(put_command=lambda *a, **k: True),
workflow_db_mgr=Mock(housekeep=lambda *a, **k: True),
broadcast_mgr=Mock(put_broadcast=lambda *a, **k: True),
data_store_mgr=DataStoreMgr(
create_autospec(Scheduler, workflow=workflow_name, owner=user)
Expand Down
Loading
Loading