Skip to content

Commit

Permalink
Merge pull request #5131 from MetRonnie/workflow_state
Browse files Browse the repository at this point in the history
`workflow_state` xtrigger: infer run number
  • Loading branch information
hjoliver authored Sep 14, 2022
2 parents 8f60601 + f96536a commit e62bc92
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 12 deletions.
10 changes: 10 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ creating a new release entry be sure to copy & paste the span tag with the
`actions:bind` attribute, which is used by a regex to find the text to be
updated. Only the first match gets replaced, so it's fine to leave the old
ones in. -->
-------------------------------------------------------------------------------
## __cylc-8.0.3 (<span actions:bind='release-date'>Pending YYYY-MM-DD</span>)__

Maintenance release.

### Fixes

[#5131](https://github.com/cylc/cylc-flow/pull/5131) - Infer workflow run number
for `workflow_state` xtrigger.

-------------------------------------------------------------------------------
## __cylc-8.0.2 (<span actions:bind='release-date'>Released 2022-09-12</span>)__

Expand Down
36 changes: 24 additions & 12 deletions cylc/flow/xtriggers/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 +14,52 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from pathlib import Path
import sqlite3
from typing import Dict, Optional, Tuple

from metomi.isodatetime.parsers import TimePointParser

from cylc.flow.cycling.util import add_offset
from cylc.flow.dbstatecheck import CylcWorkflowDBChecker
from cylc.flow.pathutil import expand_path, get_cylc_run_dir
from metomi.isodatetime.parsers import TimePointParser
from cylc.flow.workflow_files import infer_latest_run


def workflow_state(workflow, task, point, offset=None, status='succeeded',
message=None, cylc_run_dir=None):
def workflow_state(
workflow: str,
task: str,
point: str,
offset: Optional[str] = None,
status: str = 'succeeded',
message: Optional[str] = None,
cylc_run_dir: Optional[str] = None
) -> Tuple[bool, Optional[Dict[str, Optional[str]]]]:
"""Connect to a workflow DB and query the requested task state.
* Reports satisfied only if the remote workflow state has been achieved.
* Returns all workflow state args to pass on to triggering tasks.
Arguments:
workflow (str):
workflow:
The workflow to interrogate.
task (str):
task:
The name of the task to query.
point (str):
point:
The cycle point.
offset (str):
offset:
The offset between the cycle this xtrigger is used in and the one
it is querying for as an ISO8601 time duration.
e.g. PT1H (one hour).
status (str):
status:
The task status required for this xtrigger to be satisfied.
message (str):
message:
The custom task output required for this xtrigger to be satisfied.
.. note::
This cannot be specified in conjunction with ``status``.
cylc_run_dir (str):
cylc_run_dir:
The directory in which the workflow to interrogate.
.. note::
Expand All @@ -60,9 +71,9 @@ def workflow_state(workflow, task, point, offset=None, status='succeeded',
Returns:
tuple: (satisfied, results)
satisfied (bool):
satisfied:
True if ``satisfied`` else ``False``.
results (dict):
results:
Dictionary containing the args / kwargs which were provided
to this xtrigger.
Expand All @@ -73,6 +84,7 @@ def workflow_state(workflow, task, point, offset=None, status='succeeded',
cylc_run_dir = get_cylc_run_dir()
if offset is not None:
point = str(add_offset(point, offset))
_, workflow = infer_latest_run(Path(cylc_run_dir, workflow))
try:
checker = CylcWorkflowDBChecker(cylc_run_dir, workflow)
except (OSError, sqlite3.Error):
Expand Down
Empty file.
40 changes: 40 additions & 0 deletions tests/unit/xtriggers/test_workflow_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from typing import Callable
from unittest.mock import Mock


from cylc.flow.xtriggers.workflow_state import workflow_state
from ..conftest import MonkeyMock


def test_inferred_run(tmp_run_dir: Callable, monkeymock: MonkeyMock):
"""Test that the workflow_state xtrigger infers the run number"""
reg = 'isildur'
expected_workflow_id = f'{reg}/run1'
cylc_run_dir = str(tmp_run_dir())
tmp_run_dir(expected_workflow_id, installed=True, named=True)
mock_db_checker = monkeymock(
'cylc.flow.xtriggers.workflow_state.CylcWorkflowDBChecker',
return_value=Mock(
get_remote_point_format=lambda: 'CCYY',
)
)

_, results = workflow_state(reg, task='precious', point='3000')
mock_db_checker.assert_called_once_with(cylc_run_dir, expected_workflow_id)
assert results['workflow'] == expected_workflow_id

0 comments on commit e62bc92

Please sign in to comment.