Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External trigger "plugin" functions. #2423

Merged
merged 18 commits into from
Jul 2, 2018
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions bin/cylc-help
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ admin_commands['profile-battery'] = ['profile-battery']
admin_commands['import-examples'] = ['import-examples']
admin_commands['upgrade-run-dir'] = ['upgrade-run-dir']
admin_commands['check-software'] = ['check-software']
admin_commands['wrap-func'] = ['wrap-func']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More like call-func than wrap-func?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, "call" suggests a normal function call, doesn't it? This wraps a function into a minimal executable program, to execute in the process pool.

Copy link
Member Author

@hjoliver hjoliver Jun 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to function-run - less ambiguous, and no abbrev similarity to other commands (cylc warranty was a problem with wrap-func; and cylc run-func is a no-go thanks to cylc run).


license_commands = {}
license_commands['warranty'] = ['warranty']
Expand Down Expand Up @@ -362,7 +363,8 @@ comsum['test-battery'] = 'Run a battery of self-diagnosing test suites'
comsum['profile-battery'] = 'Run a battery of profiling tests'
comsum['import-examples'] = 'Import example suites your suite run directory'
comsum['upgrade-run-dir'] = 'Upgrade a pre-cylc-6 suite run directory'
comsum['check-software'] = 'Check required software is installed.'
comsum['check-software'] = 'Check required software is installed'
comsum['wrap-func'] = '(Internal) Run a function in the process pool'
# license
comsum['warranty'] = 'Print the GPLv3 disclaimer of warranty'
comsum['conditions'] = 'Print the GNU General Public License v3.0'
Expand Down Expand Up @@ -406,8 +408,8 @@ comsum['poll'] = 'Poll submitted or running tasks'
comsum['kill'] = 'Kill submitted or running tasks'
comsum['hold'] = 'Hold (pause) suites or individual tasks'
comsum['release'] = 'Release (unpause) suites or individual tasks'
comsum['reset'] = 'Force one or more tasks to change state.'
comsum['spawn'] = 'Force one or more tasks to spawn their successors.'
comsum['reset'] = 'Force one or more tasks to change state'
comsum['spawn'] = 'Force one or more tasks to spawn their successors'
comsum['nudge'] = 'Cause the cylc task processing loop to be invoked'
comsum['reload'] = 'Reload the suite definition at run time'
comsum['set-verbosity'] = 'Change a running suite\'s logging verbosity'
Expand Down
9 changes: 7 additions & 2 deletions bin/cylc-submit
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ import cylc.flags
from cylc.mp_pool import SuiteProcPool
from cylc.option_parsers import CylcOptionParser as COP
from cylc.suite_db_mgr import SuiteDatabaseManager
from cylc.broadcast_mgr import BroadcastMgr
from cylc.suite_srv_files_mgr import SuiteSrvFilesManager
from cylc.task_id import TaskID
from cylc.task_job_mgr import TaskJobManager
from cylc.task_events_mgr import TaskEventsManager
from cylc.task_proxy import TaskProxy
from cylc.task_state import TASK_STATUS_SUBMIT_FAILED
from cylc.templatevars import load_template_vars
Expand Down Expand Up @@ -101,8 +103,11 @@ def main():

# Initialise job submit environment
glbl_cfg().create_cylc_run_tree(suite)
task_job_mgr = TaskJobManager(
suite, SuiteProcPool(), SuiteDatabaseManager(), suite_srv_mgr)
pool = SuiteProcPool()
db_mgr = SuiteDatabaseManager()
b_mgr = BroadcastMgr(db_mgr)
te_mgr = TaskEventsManager(suite, pool, db_mgr, b_mgr)
task_job_mgr = TaskJobManager(suite, pool, db_mgr, suite_srv_mgr, te_mgr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change still necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean exactly by "this change"? Oh, maybe just that the te_mgr and b_bgr variables are unnecessary as only used once? I'll change that...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

task_job_mgr.task_remote_mgr.single_task_mode = True
task_job_mgr.job_file_writer.set_suite_env({
'CYLC_UTC': str(config.cfg['cylc']['UTC mode']),
Expand Down
28 changes: 6 additions & 22 deletions bin/cylc-suite-state
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ from cylc.dbstatecheck import CylcSuiteDBChecker
from cylc.cfgspec.glbl_cfg import glbl_cfg
from cylc.command_polling import Poller
from cylc.task_state import TASK_STATUSES_ORDERED
from cylc.cycling.util import add_offset

from isodatetime.parsers import TimePointParser, DurationParser
from isodatetime.parsers import TimePointParser


class SuitePoller(Poller):
Expand Down Expand Up @@ -200,27 +201,10 @@ def main():

# Attempt to apply specified offset to the targeted cycle
if options.offset:
my_parser = TimePointParser()
my_target_point = my_parser.parse(options.cycle, dump_as_parsed=True)
my_offset_parser = DurationParser()

oper = "+"
if options.offset.startswith("-") or options.offset.startswith("+"):
oper = options.offset[0]
options.offset = options.offset[1:]
if options.offset.startswith("P"):
try:
my_shift = my_offset_parser.parse(options.offset)
except ValueError:
sys.exit("ERROR: Cannot parse offset: %s" % options.offset)
if oper == "-":
my_target_point -= my_shift
else:
my_target_point += my_shift
else:
sys.exit("ERROR: Unrecognised offset: %s" % options.offset)

options.cycle = str(my_target_point)
try:
options.cycle = str(add_offset(options.cycle, options.offset))
except ValueError as exc:
sys.exit(exc)

# Exit if both task state and message are to being polled
if options.status and options.msg:
Expand Down
15 changes: 9 additions & 6 deletions bin/cylc-validate
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,12 @@ def main():
profiler.start()

suite, suiterc = SuiteSrvFilesManager().parse_suite_arg(options, args[0])

cfg = SuiteConfig(
suite, suiterc,
load_template_vars(options.templatevars, options.templatevars_file),
cli_initial_point_string=options.icp,
is_validate=True, strict=options.strict, run_mode=options.run_mode,
output_fname=options.output,
mem_log_func=profiler.log_memory)
output_fname=options.output, mem_log_func=profiler.log_memory)

# Instantiate tasks and force evaluation of trigger expressions.
# (Taken from config.py to avoid circular import problems.)
Expand All @@ -103,9 +101,14 @@ def main():
try:
itask.state.prerequisites_eval_all()
except TriggerExpressionError as exc:
print >> sys.stderr, str(exc)
raise SuiteConfigError(
'ERROR, %s: invalid trigger expression.' % name)
err = str(exc)
if '@' in err:
print >> sys.stderr, (
"ERROR, %s: xtriggers can't be in conditional"
" expressions: %s" % (name, err))
else:
print >> sys.stderr, 'ERROR, %s: bad trigger: %s' % (name, err)
raise SuiteConfigError("ERROR: bad trigger")
except Exception as exc:
print >> sys.stderr, str(exc)
raise SuiteConfigError(
Expand Down
58 changes: 58 additions & 0 deletions bin/cylc-wrap-func
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/usr/bin/env python2

# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2018 NIWA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
INTERNAL USE (asynchronous xtrigger function execution).

USAGE: cylc wrap-func <func-name> <'json-func-args'> <'json-func-kwargs'>

Run "func-name(*func-args, **func-kwargs)" in the command process pool.

The function must be defined in a module of the same name (func-name).

The function's stdout is redirected to stderr, and visible in suite log in
debug mode. Its return value is printed to stdout as a JSON string.
"""

# Function return value to stdout: for compat with the command process pool.

import importlib
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you forgot to fetch my updates. This command has changed name to cylc-function-run, and the unused import removed.

import json
import sys
from cylc.xtrigger_mgr import get_func


def run_func(func_name, func_args, func_kwargs):
func = get_func(func_name)
# Redirect stdout to stderr.
orig_stdout = sys.stdout
sys.stdout = sys.stderr
res = func(*func_args, **func_kwargs)
# Restore stdout.
sys.stdout = orig_stdout
# Write function return value as JSON to stdout.
sys.stdout.write(json.dumps(res))


if __name__ == "__main__":
if sys.argv[1] in ['help', '--help']:
print __doc__
sys.exit(0)
func_name = sys.argv[1]
func_args = json.loads(sys.argv[2])
func_kwargs = json.loads(sys.argv[3])
run_func(func_name, func_args, func_kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be good to move the json-load => call function => json-dump logic to a location under lib/ (in the future?). (See also protocols such as JSON-RPC.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could do (pref "for the future" as I haven't time to look at JSON-RPC right now)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've taken your advice on move into lib/cylc.

Loading