Skip to content

Commit

Permalink
Various minor tweaks (feedback and codacy).
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Jun 28, 2018
1 parent f0f438b commit 215a4c4
Show file tree
Hide file tree
Showing 19 changed files with 106 additions and 110 deletions.
36 changes: 36 additions & 0 deletions bin/cylc-function-run
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/usr/bin/env python2

# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2018 NIWA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""USAGE: cylc function-run <name> <json-args> <json-kwargs>
INTERNAL USE (asynchronous external trigger function execution)
Run a Python function "<name>(*args, **kwargs)" in the process pool. It must be
defined in a module of the same name. Positional and keyword arguments must be
passed in as JSON strings.
"""

import sys
from cylc.mp_pool import run_function


if __name__ == "__main__":
if sys.argv[1] in ['help', '--help'] or len(sys.argv) != 4:
print __doc__
sys.exit(0)
run_function(sys.argv[1], sys.argv[2], sys.argv[3])
6 changes: 4 additions & 2 deletions bin/cylc-help
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ utility_commands['scp-transfer'] = ['scp-transfer']
utility_commands['suite-state'] = ['suite-state']
utility_commands['ls-checkpoints'] = ['ls-checkpoints']
utility_commands['report-timings'] = ['report-timings']
utility_commands['function-run'] = ['function-run']

hook_commands = {}
hook_commands['email-suite'] = ['email-suite']
Expand All @@ -292,7 +293,6 @@ admin_commands['profile-battery'] = ['profile-battery']
admin_commands['import-examples'] = ['import-examples']
admin_commands['upgrade-run-dir'] = ['upgrade-run-dir']
admin_commands['check-software'] = ['check-software']
admin_commands['wrap-func'] = ['wrap-func']

license_commands = {}
license_commands['warranty'] = ['warranty']
Expand Down Expand Up @@ -364,7 +364,6 @@ comsum['profile-battery'] = 'Run a battery of profiling tests'
comsum['import-examples'] = 'Import example suites your suite run directory'
comsum['upgrade-run-dir'] = 'Upgrade a pre-cylc-6 suite run directory'
comsum['check-software'] = 'Check required software is installed'
comsum['wrap-func'] = '(Internal) Run a function in the process pool'
# license
comsum['warranty'] = 'Print the GPLv3 disclaimer of warranty'
comsum['conditions'] = 'Print the GNU General Public License v3.0'
Expand Down Expand Up @@ -429,13 +428,16 @@ comsum['jobs-poll'] = '(Internal) Retrieve status for task jobs'
comsum['jobs-submit'] = '(Internal) Submit task jobs'
comsum['remote-init'] = '(Internal) Initialise a task remote'
comsum['remote-tidy'] = '(Internal) Tidy a task remote'

# utility
comsum['cycle-point'] = 'Cycle point arithmetic and filename templating'
comsum['jobscript'] = 'Generate a task job script and print it to stdout'
comsum['scp-transfer'] = 'Scp-based file transfer for cylc suites'
comsum['suite-state'] = 'Query the task states in a suite'
comsum['ls-checkpoints'] = 'Display task pool etc at given events'
comsum['report-timings'] = 'Generate a report on task timing data'
comsum['function-run'] = '(Internal) Run a function in the process pool'

# hook
comsum['email-task'] = 'A task event hook script that sends email alerts'
comsum['email-suite'] = 'A suite event hook script that sends email alerts'
Expand Down
6 changes: 3 additions & 3 deletions bin/cylc-submit
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ def main():
glbl_cfg().create_cylc_run_tree(suite)
pool = SuiteProcPool()
db_mgr = SuiteDatabaseManager()
b_mgr = BroadcastMgr(db_mgr)
te_mgr = TaskEventsManager(suite, pool, db_mgr, b_mgr)
task_job_mgr = TaskJobManager(suite, pool, db_mgr, suite_srv_mgr, te_mgr)
task_job_mgr = TaskJobManager(
suite, pool, db_mgr, suite_srv_mgr,
TaskEventsManager(suite, pool, db_mgr, Broadcast(db_mgr)))
task_job_mgr.task_remote_mgr.single_task_mode = True
task_job_mgr.job_file_writer.set_suite_env({
'CYLC_UTC': str(config.cfg['cylc']['UTC mode']),
Expand Down
58 changes: 0 additions & 58 deletions bin/cylc-wrap-func

This file was deleted.

2 changes: 2 additions & 0 deletions doc/src/cylc-user-guide/cug.tex
Original file line number Diff line number Diff line change
Expand Up @@ -6328,6 +6328,8 @@ \subsection{Custom Trigger Functions}
\item recommend using built-in trigger functions as an example
\item refer above for result qualification by label name
\item show another example (the random and echo built-ins?)
\item example graphs
\item debugging trigger functions
\end{myitemize}
\subsection{Current Limitations}
Expand Down
File renamed without changes.
8 changes: 3 additions & 5 deletions etc/dev-bin/defn-order-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import time, string, random
from copy import deepcopy

"""
This is a standalone performance test of the algorithm used in gcylc to
sort namespaces into "definition order", i.e. the order in which they are
defined in the suite.rc file.
"""
# This is a standalone performance test of the algorithm used in gcylc to
# sort namespaces into "definition order", i.e. the order in which they are
# defined in the suite.rc file.

# Number of namespaces.
N=10000
Expand Down
2 changes: 1 addition & 1 deletion etc/dev-bin/start-group-demo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ rm -rf ~/cylc-run/$DEST
SKIP=4
for GROUP in nwp tst opr; do
for N in 1 2 3 4; do
if (( $SKIP == $N )); then
if (( SKIP == N )); then
SKIP=$(( SKIP - 1))
break
fi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
"""

import re
import sys
import json
from kafka import KafkaConsumer
from cylc.suite_logging import LOG
Expand Down Expand Up @@ -63,7 +62,7 @@ def cylc_kafka_consumer(kafka_server, kafka_topic, group_id, message, debug):
* group_id - determines Kafka offset ownership (see below).
* message - string-ified dict with optional pattern elements (see below).
* debug - boolean; set by daemon debug mode; prints to suite err log.
The topic is first consumed from the beginning, then from the previous
committed offset. If the message is not found by end of topic, commit the
offset and return (to will try again later). If found, return the result.
Expand All @@ -86,7 +85,7 @@ def cylc_kafka_consumer(kafka_server, kafka_topic, group_id, message, debug):
The "message" argument is a stringified dict, e.g.:
{'system': 'prod', 'point': '2025', 'data': '<nwp.*\.nc>'}
should be represented as:
should be represented as:
"system:prod point:2025 data:<nwp.*\.nc>"
A match occurs Kafka if all message dict items match, and the result returned
Expand Down
2 changes: 0 additions & 2 deletions etc/examples/AutoRecover/CleanupTask/bin/fam_cleanup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@ sleep 10 # (time to observe the failed tasks in the suite monitor).
# Determine which family member(s) failed, if any
FAILED_TASKS=$(cylc dump $CYLC_SUITE_NAME | grep $CYLC_TASK_CYCLE_TIME | grep failed | sed -e 's/,.*$//')

found_failed_member=false
echo "FAILED TASKS:"
for T in $FAILED_TASKS; do
echo -n " $T ..."
if [[ $T == m_* ]]; then
found_failed_member=true
echo "REMOVING family member"
cylc control remove --force $CYLC_SUITE_NAME ${T}.$CYLC_TASK_CYCLE_TIME
else
Expand Down
4 changes: 3 additions & 1 deletion etc/examples/AutoRecover/EventHook/bin/failhook.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
# hook scripts can intervene in the running of their own suite.

# inputs supplied by cylc
EVENT=$1; SUITE=$2; TASK=$3; MSG=$4
# EVENT=$1 # not needed
SUITE=$2; TASK=$3
# MSG=$4 # not needed

echo "(HOOK SCRIPT: waiting 10 seconds)"
sleep 10 # (time to observe failed task in the suite monitor).
Expand Down
4 changes: 2 additions & 2 deletions etc/examples/task-states/bin/change-my-job-sub-method.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#!/bin/bash

EVENT=$1 # e.g. "submit_failed"
#EVENT=$1 # e.g. "submit_failed"
SUITE=$2 # name of the suite
TASKID=$3 # ID of the task
MESSAGE="$4" # quotes required (message contains spaces)
#MESSAGE="$4" # quotes required (message contains spaces)

echo "${0}:resetting job submission method with cylc broadcast"

Expand Down
1 change: 0 additions & 1 deletion lib/cylc/cfgspec/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def coerce_type(in_str):
value = _strip_and_unquote(keys, value)
if not value:
raise IllegalValueError("xtrigger", keys, value)
fctx = None
fname = None
args = []
kwargs = {}
Expand Down
2 changes: 1 addition & 1 deletion lib/cylc/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from isodatetime.data import Calendar, Duration
from isodatetime.dumpers import TimePointDumper
from isodatetime.parsers import TimePointParser, DurationParser
from isodatetime.parsers import DurationParser
from isodatetime.timezone import (
get_local_time_zone, get_local_time_zone_format)
from cylc.time_parser import CylcTimeParser
Expand Down
32 changes: 27 additions & 5 deletions lib/cylc/mp_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""Manage queueing and pooling of subprocesses for the suite server program."""

import os
import sys
import time
from pipes import quote
from signal import SIGKILL
Expand All @@ -26,12 +27,33 @@
from collections import deque
from threading import RLock
from cylc.cfgspec.glbl_cfg import glbl_cfg
import time
import json
import traceback
from signal import signal, alarm, SIGALRM
from cylc.suite_logging import LOG
from cylc.wallclock import get_current_time_string
from cylc.xtrigger_mgr import get_func


def run_function(func_name, json_args, json_kwargs):
"""Run a Python function in the process pool.
func_name(*func_args, **func_kwargs)
Redirect any function stdout to stderr (and suite log in debug mode).
Return value printed to stdout as a JSON string - allows use of the
existing process pool machinery as-is.
"""
func_args = json.loads(json_args)
func_kwargs = json.loads(json_kwargs)
func = get_func(func_name)
# Redirect stdout to stderr.
orig_stdout = sys.stdout
sys.stdout = sys.stderr
res = func(*func_args, **func_kwargs)
# Restore stdout.
sys.stdout = orig_stdout
# Write function return value as JSON to stdout.
sys.stdout.write(json.dumps(res))


class SuiteProcContext(object):
Expand Down Expand Up @@ -123,7 +145,7 @@ class SuiteFuncContext(SuiteProcContext):
"""Represent the context of a function to run in the process pool.
Attributes:
# (See also parent class attributes).
# See also parent class attributes.
.label (str):
function label under [xtriggers] in suite.rc
.func_name (str):
Expand Down Expand Up @@ -151,7 +173,7 @@ def __init__(self, label, func_name, func_args, func_kwargs, intvl):

def update_command(self):
"""Update the function wrap command after changes."""
self.cmd = ['cylc-wrap-func', self.func_name,
self.cmd = ['cylc-function-run', self.func_name,
json.dumps(self.func_args),
json.dumps(self.func_kwargs)]

Expand Down
1 change: 1 addition & 0 deletions lib/cylc/suite_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ def put_task_event_timers(self, task_events_mgr):
"timeout": timer.timeout})

def put_xtriggers(self, sat_xtrig):
"""Put statements to update external triggers table."""
self.db_deletes_map[self.TABLE_XTRIGGERS].append({})
for sig, res in sat_xtrig.items():
self.db_inserts_map[self.TABLE_XTRIGGERS].append({
Expand Down
6 changes: 2 additions & 4 deletions lib/cylc/task_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,8 @@ def satisfy_me(self, all_task_outputs):
def xtriggers_all_satisfied(self):
"""Return True if xclock and all xtriggers are satisfied."""
if self.xclock is not None and not self.xclock[1]:
xclock_satisfied = False
else:
xclock_satisfied = True
return xclock_satisfied and all(self.xtriggers.values())
return False
return all(self.xtriggers.values())

def prerequisites_are_all_satisfied(self):
"""Return True if (non-suicide) prerequisites are fully satisfied."""
Expand Down
21 changes: 9 additions & 12 deletions lib/cylc/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,11 @@ def load_xtrigger_for_restart(self, row_idx, row):

def housekeep(self):
"""Delete satisfied xtriggers and xclocks no longer needed."""
rem = []
for sig in self.sat_xtrig:
for sig in list(self.sat_xtrig):
if sig not in self.all_xtrig:
rem.append(sig)
for r in rem:
del self.sat_xtrig[r]
rem = []
for sig in self.sat_xclock:
if sig not in self.all_xclock:
rem.append(sig)
for r in rem:
self.sat_xclock.remove(r)
del self.sat_xtrig[sig]
self.sat_xclock = [
sig for sig in self.sat_xclock if sig in self.all_xclock]

def satisfy_xclock(self, itask):
"""Attempt to satisfy itask's clock trigger, if it has one."""
Expand Down Expand Up @@ -290,7 +283,11 @@ def callback(self, ctx):
LOG.debug(ctx)
sig = ctx.get_signature()
self.active.remove(sig)
satisfied, results = json.loads(ctx.out)
try:
satisfied, results = json.loads(ctx.out)
except ValueError:
print "WTF?"
return
LOG.debug('%s: returned %s' % (sig, results))
if satisfied:
self.pflag = True
Expand Down
Loading

0 comments on commit 215a4c4

Please sign in to comment.