Skip to content

Commit

Permalink
Merge pull request #2423 from hjoliver/ext-trig-plug
Browse files Browse the repository at this point in the history
External trigger "plugin" functions.
  • Loading branch information
hjoliver authored Jul 2, 2018
2 parents fccbdf1 + b35f458 commit 1dca33e
Show file tree
Hide file tree
Showing 59 changed files with 2,201 additions and 286 deletions.
37 changes: 37 additions & 0 deletions bin/cylc-function-run
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env python2

# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2018 NIWA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""USAGE: cylc function-run <name> <json-args> <json-kwargs> <src-dir>
INTERNAL USE (asynchronous external trigger function execution)
Run a Python function "<name>(*args, **kwargs)" in the process pool. It must be
defined in a module of the same name. Positional and keyword arguments must be
passed in as JSON strings. <src-dir> is the suite source dir, needed to find
local xtrigger modules.
"""

import sys
from cylc.mp_pool import run_function


if __name__ == "__main__":
if sys.argv[1] in ['help', '--help'] or len(sys.argv) != 5:
print __doc__
sys.exit(0)
run_function(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4])
10 changes: 7 additions & 3 deletions bin/cylc-help
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ utility_commands['scp-transfer'] = ['scp-transfer']
utility_commands['suite-state'] = ['suite-state']
utility_commands['ls-checkpoints'] = ['ls-checkpoints']
utility_commands['report-timings'] = ['report-timings']
utility_commands['function-run'] = ['function-run']

hook_commands = {}
hook_commands['email-suite'] = ['email-suite']
Expand Down Expand Up @@ -362,7 +363,7 @@ comsum['test-battery'] = 'Run a battery of self-diagnosing test suites'
comsum['profile-battery'] = 'Run a battery of profiling tests'
comsum['import-examples'] = 'Import example suites your suite run directory'
comsum['upgrade-run-dir'] = 'Upgrade a pre-cylc-6 suite run directory'
comsum['check-software'] = 'Check required software is installed.'
comsum['check-software'] = 'Check required software is installed'
# license
comsum['warranty'] = 'Print the GPLv3 disclaimer of warranty'
comsum['conditions'] = 'Print the GNU General Public License v3.0'
Expand Down Expand Up @@ -406,8 +407,8 @@ comsum['poll'] = 'Poll submitted or running tasks'
comsum['kill'] = 'Kill submitted or running tasks'
comsum['hold'] = 'Hold (pause) suites or individual tasks'
comsum['release'] = 'Release (unpause) suites or individual tasks'
comsum['reset'] = 'Force one or more tasks to change state.'
comsum['spawn'] = 'Force one or more tasks to spawn their successors.'
comsum['reset'] = 'Force one or more tasks to change state'
comsum['spawn'] = 'Force one or more tasks to spawn their successors'
comsum['nudge'] = 'Cause the cylc task processing loop to be invoked'
comsum['reload'] = 'Reload the suite definition at run time'
comsum['set-verbosity'] = 'Change a running suite\'s logging verbosity'
Expand All @@ -427,13 +428,16 @@ comsum['jobs-poll'] = '(Internal) Retrieve status for task jobs'
comsum['jobs-submit'] = '(Internal) Submit task jobs'
comsum['remote-init'] = '(Internal) Initialise a task remote'
comsum['remote-tidy'] = '(Internal) Tidy a task remote'

# utility
comsum['cycle-point'] = 'Cycle point arithmetic and filename templating'
comsum['jobscript'] = 'Generate a task job script and print it to stdout'
comsum['scp-transfer'] = 'Scp-based file transfer for cylc suites'
comsum['suite-state'] = 'Query the task states in a suite'
comsum['ls-checkpoints'] = 'Display task pool etc at given events'
comsum['report-timings'] = 'Generate a report on task timing data'
comsum['function-run'] = '(Internal) Run a function in the process pool'

# hook
comsum['email-task'] = 'A task event hook script that sends email alerts'
comsum['email-suite'] = 'A suite event hook script that sends email alerts'
Expand Down
7 changes: 6 additions & 1 deletion bin/cylc-submit
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ import cylc.flags
from cylc.mp_pool import SuiteProcPool
from cylc.option_parsers import CylcOptionParser as COP
from cylc.suite_db_mgr import SuiteDatabaseManager
from cylc.broadcast_mgr import BroadcastMgr
from cylc.suite_srv_files_mgr import SuiteSrvFilesManager
from cylc.task_id import TaskID
from cylc.task_job_mgr import TaskJobManager
from cylc.task_events_mgr import TaskEventsManager
from cylc.task_proxy import TaskProxy
from cylc.task_state import TASK_STATUS_SUBMIT_FAILED
from cylc.templatevars import load_template_vars
Expand Down Expand Up @@ -101,8 +103,11 @@ def main():

# Initialise job submit environment
glbl_cfg().create_cylc_run_tree(suite)
pool = SuiteProcPool()
db_mgr = SuiteDatabaseManager()
task_job_mgr = TaskJobManager(
suite, SuiteProcPool(), SuiteDatabaseManager(), suite_srv_mgr)
suite, pool, db_mgr, suite_srv_mgr,
TaskEventsManager(suite, pool, db_mgr, BroadcastMgr(db_mgr)))
task_job_mgr.task_remote_mgr.single_task_mode = True
task_job_mgr.job_file_writer.set_suite_env({
'CYLC_UTC': str(config.cfg['cylc']['UTC mode']),
Expand Down
28 changes: 6 additions & 22 deletions bin/cylc-suite-state
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ from cylc.dbstatecheck import CylcSuiteDBChecker
from cylc.cfgspec.glbl_cfg import glbl_cfg
from cylc.command_polling import Poller
from cylc.task_state import TASK_STATUSES_ORDERED
from cylc.cycling.util import add_offset

from isodatetime.parsers import TimePointParser, DurationParser
from isodatetime.parsers import TimePointParser


class SuitePoller(Poller):
Expand Down Expand Up @@ -200,27 +201,10 @@ def main():

# Attempt to apply specified offset to the targeted cycle
if options.offset:
my_parser = TimePointParser()
my_target_point = my_parser.parse(options.cycle, dump_as_parsed=True)
my_offset_parser = DurationParser()

oper = "+"
if options.offset.startswith("-") or options.offset.startswith("+"):
oper = options.offset[0]
options.offset = options.offset[1:]
if options.offset.startswith("P"):
try:
my_shift = my_offset_parser.parse(options.offset)
except ValueError:
sys.exit("ERROR: Cannot parse offset: %s" % options.offset)
if oper == "-":
my_target_point -= my_shift
else:
my_target_point += my_shift
else:
sys.exit("ERROR: Unrecognised offset: %s" % options.offset)

options.cycle = str(my_target_point)
try:
options.cycle = str(add_offset(options.cycle, options.offset))
except ValueError as exc:
sys.exit(exc)

# Exit if both task state and message are to being polled
if options.status and options.msg:
Expand Down
15 changes: 9 additions & 6 deletions bin/cylc-validate
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,12 @@ def main():
profiler.start()

suite, suiterc = SuiteSrvFilesManager().parse_suite_arg(options, args[0])

cfg = SuiteConfig(
suite, suiterc,
load_template_vars(options.templatevars, options.templatevars_file),
cli_initial_point_string=options.icp,
is_validate=True, strict=options.strict, run_mode=options.run_mode,
output_fname=options.output,
mem_log_func=profiler.log_memory)
output_fname=options.output, mem_log_func=profiler.log_memory)

# Instantiate tasks and force evaluation of trigger expressions.
# (Taken from config.py to avoid circular import problems.)
Expand All @@ -103,9 +101,14 @@ def main():
try:
itask.state.prerequisites_eval_all()
except TriggerExpressionError as exc:
print >> sys.stderr, str(exc)
raise SuiteConfigError(
'ERROR, %s: invalid trigger expression.' % name)
err = str(exc)
if '@' in err:
print >> sys.stderr, (
"ERROR, %s: xtriggers can't be in conditional"
" expressions: %s" % (name, err))
else:
print >> sys.stderr, 'ERROR, %s: bad trigger: %s' % (name, err)
raise SuiteConfigError("ERROR: bad trigger")
except Exception as exc:
print >> sys.stderr, str(exc)
raise SuiteConfigError(
Expand Down
Loading

0 comments on commit 1dca33e

Please sign in to comment.