Skip to content

Commit

Permalink
Merge pull request #1365 from matthewrmshin/cylc-log-work-restructure
Browse files Browse the repository at this point in the history
Suite log/job/ and work/ restructure
  • Loading branch information
arjclark committed Aug 12, 2014
2 parents b307bdc + 557ad27 commit 386d90c
Show file tree
Hide file tree
Showing 38 changed files with 396 additions and 622 deletions.
4 changes: 2 additions & 2 deletions doc/rose-configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -925,8 +925,8 @@ <h2 id="appendix-rose-ana-config">Appendix: rose-ana configuration
[Compare Gradiant]
comparison=Exact
extract=OutputGrepper:' gradJ= \s*(\S+)'
kgo1file=$SUITE_CONTROL_DIR/log/job/{}.1.1.out
resultfile=$ROSE_SUITE_DIR/log/job/{}.1.1.out
kgo1file=$SUITE_CONTROL_DIR/log/job/1/{}/NN/out
resultfile=$ROSE_SUITE_DIR/log/job/1/{}/NN/out
</pre>
</div>
</body>
Expand Down
40 changes: 21 additions & 19 deletions lib/python/rose/apps/rose_prune.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class RosePruneApp(BuiltinApp):
SCHEME = "rose_prune"
SECTION = "prune"

def run(self, app_runner, conf_tree, *_):
def run(self, app_runner, conf_tree, opts, args, uuid, work_files):
"""Suite housekeeping application.
This application is designed to work under "rose task-run" in a cycling
Expand All @@ -60,25 +60,9 @@ def run(self, app_runner, conf_tree, *_):
if archive_logs_cycles:
app_runner.suite_engine_proc.job_logs_archive(
suite_name, archive_logs_cycles)
globs = []
globs = (self._get_prune_globs(app_runner, conf_tree, "datac") +
self._get_prune_globs(app_runner, conf_tree, "work"))
suite_engine_proc = app_runner.suite_engine_proc
for key, max_args in [("datac", 1), ("work", 2)]:
for cycle, cycle_args in self._get_conf(conf_tree,
"prune-" + key + "-at",
max_args=max_args):
tail_globs = None
if cycle_args:
tail_globs = shlex.split(cycle_args.pop())
head_globs = None
if cycle_args:
head_globs = shlex.split(cycle_args.pop())
for head in suite_engine_proc.get_cycle_items_globs(
key, cycle, head_globs):
if tail_globs:
for tail_glob in tail_globs:
globs.append(os.path.join(head, tail_glob))
else:
globs.append(head)
hosts = suite_engine_proc.get_suite_jobs_auths(suite_name)
suite_dir_rel = suite_engine_proc.get_suite_dir_rel(suite_name)
form_dict = {"d": suite_dir_rel, "g": " ".join(globs)}
Expand Down Expand Up @@ -164,3 +148,21 @@ def _get_conf(self, conf_tree, key, max_args=0):
else:
items.append(cycle)
return items

def _get_prune_globs(self, app_runner, conf_tree, key):
"""Return prune globs for "key"."""
globs = []
for cycle, cycle_args in self._get_conf(conf_tree,
"prune-" + key + "-at",
max_args=1):
tail_globs = None
if cycle_args:
tail_globs = shlex.split(cycle_args.pop())
for head in app_runner.suite_engine_proc.get_cycle_items_globs(
key, cycle):
if tail_globs:
for tail_glob in tail_globs:
globs.append(os.path.join(head, tail_glob))
else:
globs.append(head)
return globs
4 changes: 3 additions & 1 deletion lib/python/rose/suite_engine_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"""Suite engine processor management."""

from datetime import datetime, timedelta
from glob import glob
from isodatetime.data import Duration
from isodatetime.parsers import DurationParser
import os
Expand Down Expand Up @@ -260,6 +261,7 @@ class TaskProps(object):
"task_suffix": "ROSE_TASK_SUFFIX",
"cycling_mode": "ROSE_CYCLING_MODE",
"task_cycle_time": "ROSE_TASK_CYCLE_TIME",
"task_log_dir": "ROSE_TASK_LOG_DIR",
"task_log_root": "ROSE_TASK_LOG_ROOT",
"task_is_cold_start": "ROSE_TASK_IS_COLD_START",
"dir_data": "ROSE_DATA",
Expand Down Expand Up @@ -367,7 +369,7 @@ def cmp_suite_conf(self, suite_name, strict_mode=False, debug_mode=False):
"""
raise NotImplementedError()

def get_cycle_items_globs(self, name, cycle, task_glob=None):
def get_cycle_items_globs(self, name, cycle):
"""Return a glob to match named items created for a given cycle.
E.g.:
Expand Down
127 changes: 73 additions & 54 deletions lib/python/rose/suite_engine_procs/cylc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#-----------------------------------------------------------------------------
"""Logic specific to the Cylc suite engine."""

from isodatetime.parsers import TimePointParser
import filecmp
from fnmatch import fnmatch
from glob import glob
Expand Down Expand Up @@ -62,7 +61,10 @@ class CylcProcessor(SuiteEngineProcessor):
EVENT_RANKS = {"submit-init": 0, "submit": 1, "fail(submit)": 1, "init": 2,
"success": 3, "fail": 3, "fail(%s)": 4}
JOB_LOGS_DB = "log/rose-job-logs.db"
JOB_LOG_TAIL_KEYS = {"": "00-script", "out": "01-out", "err": "02-err"}
JOB_LOG_TAIL_KEYS = {
"job": "00-script",
"job.out": "01-out",
"job.err": "02-err"}
ORDERS = {
"time_desc":
"time DESC, task_events.submit_num DESC, name DESC, cycle DESC",
Expand Down Expand Up @@ -192,7 +194,7 @@ def gcontrol(self, suite_name, host=None, engine_version=None, args=None):
self.popen(fmt % (host, suite_name, args_str, os.devnull),
env=environ, shell=True)

def get_cycle_items_globs(self, name, cycle, task_globs=None):
def get_cycle_items_globs(self, name, cycle):
"""Return a glob to match named items created for a given cycle.
E.g.:
Expand All @@ -202,15 +204,8 @@ def get_cycle_items_globs(self, name, cycle, task_globs=None):
Return None if named item not supported.
"""
if task_globs is None:
task_globs = ["*"]
dict_ = {"datac": "share/data/%(cycle)s",
"work": "work/%(task_glob)s.%(cycle)s"}
globs = []
for task_glob in task_globs:
glob_ = dict_.get(name) % {"cycle": cycle, "task_glob": task_glob}
globs.append(glob_)
return globs
dict_ = {"datac": "share/data/%(cycle)s", "work": "work/%(cycle)s"}
return [dict_.get(name) % {"cycle": cycle}]

def get_suite_dir_rel(self, suite_name, *paths):
"""Return the relative path to the suite running directory.
Expand Down Expand Up @@ -629,6 +624,7 @@ def get_task_props_from_env(self):
task_id=task_id,
task_name=task_name,
task_cycle_time=task_cycle_time,
task_log_dir=os.path.dirname(task_log_root),
task_log_root=task_log_root,
task_is_cold_start=task_is_cold_start,
cycling_mode=cycling_mode)
Expand Down Expand Up @@ -763,40 +759,49 @@ def job_logs_archive(self, suite_name, items):
if cycle:
cycles.append(cycle)
self.job_logs_pull_remote(suite_name, cycles, prune_remote_mode=True)
log_dir = self.get_suite_dir(suite_name, "log")
cwd = os.getcwd()
self.fs_util.chdir(log_dir)
self.fs_util.chdir(self.get_suite_dir(suite_name))
try:
stmt = ("UPDATE log_files SET path=?, path_in_tar=? " +
"WHERE cycle==? AND task==? AND submit_num==? AND key==?")
for cycle in cycles:
archive_file_name0 = "job-" + cycle + ".tar"
archive_file_name0 = os.path.join("log",
"job-" + cycle + ".tar")
archive_file_name = archive_file_name0 + ".gz"
if os.path.exists(archive_file_name):
continue
glob_ = self.TASK_ID_DELIM.join(["*", cycle, "*"])
names = glob(os.path.join("job", glob_))
glob_ = os.path.join(cycle, "*", "*", "*")
names = glob(os.path.join("log", "job", glob_))
if not names:
continue
f_bsize = os.statvfs(".").f_bsize
tar = tarfile.open(archive_file_name0, "w", bufsize=f_bsize)
for name in names:
tar.add(name)
cycle, task, s_n, ext = self._parse_job_log_base_name(name)
if s_n == "NN" or ext == "job.status":
continue
tar.add(name, name.replace("log/", "", 1))
tar.close()
# N.B. Python's gzip is slow
self.popen.run_simple("gzip", "-f", archive_file_name0)
self.handle_event(FileSystemEvent(FileSystemEvent.CREATE,
archive_file_name))
for name in sorted(names):
self.fs_util.delete(name)
self.fs_util.delete(os.path.join("log", "job", cycle))
for name in names:
# cycle, task, submit_num, extension
cycle, task, s_n, ext = self._parse_job_log_base_name(name)
if s_n == "NN" or ext == "job.status":
continue
key = ext
if ext in self.JOB_LOG_TAIL_KEYS:
key = self.JOB_LOG_TAIL_KEYS[ext]
stmt_args = [os.path.join("log", archive_file_name),
name, cycle, task, s_n, key]
stmt_args = [
os.path.join(archive_file_name),
name.replace("log/", "", 1),
cycle,
task,
int(s_n),
key]
self._db_exec(self.JOB_LOGS_DB, None, suite_name,
stmt, stmt_args, commit=True)
finally:
Expand Down Expand Up @@ -842,11 +847,11 @@ def job_logs_pull_remote(self, suite_name, items, prune_remote_mode=False):
uuid_file_name = os.path.join(log_dir, uuid)
self.fs_util.touch(uuid_file_name)
try:
glob_auths_map = {}
auths_filters = [] # [(auths, includes, excludes), ...]
if "*" in items:
auths = self.get_suite_jobs_auths(suite_name)
if auths:
glob_auths_map["*"] = self.get_suite_jobs_auths(suite_name)
auths_filters.append((auths, [], []))
else:
for item in items:
cycle, name = self._parse_task_cycle_id(item)
Expand All @@ -856,33 +861,46 @@ def job_logs_pull_remote(self, suite_name, items, prune_remote_mode=False):
continue
auths = self.get_suite_jobs_auths(suite_name, cycle, name)
if auths:
glob_names = []
for list_ in [name, cycle, None]:
if list_ is None:
glob_names.append("*")
else:
glob_names.append(list_)
glob_ = self.TASK_ID_DELIM.join(glob_names)
glob_auths_map[glob_] = auths
# FIXME: more efficient if auth is key?
for glob_, auths in glob_auths_map.items():
includes = []
excludes = []
if cycle is None and name is None:
includes = []
excludes = []
elif name is None:
includes = ["/" + cycle]
excludes = ["/*"]
elif cycle is None:
includes = ["/*/" + name]
excludes = ["/*/*"]
else:
includes = ["/" + cycle, "/" + cycle + "/" + name]
excludes = ["/*", "/*/*"]
auths_filters.append((auths, includes, excludes))

for auths, includes, excludes in auths_filters:
for auth in auths:
data = {"auth": auth,
"log_dir_rel": log_dir_rel,
"uuid": uuid,
"glob_": glob_}
"glob_": "*"}
if includes:
data["glob_"] = includes[-1][1:] # Remove leading /
cmd = self.popen.get_cmd(
"ssh", auth,
("cd %(log_dir_rel)s && " +
"(! test -f %(uuid)s && ls -d %(glob_)s)") % data)
ret_code, ssh_ls_out, _ = self.popen.run(*cmd)
if ret_code:
continue
cmd_list = ["rsync"]
for include in includes:
cmd_list.append("--include=" + include)
for exclude in excludes:
cmd_list.append("--exclude=" + exclude)
cmd_list.append("%(auth)s:%(log_dir_rel)s/" % data)
cmd_list.append(log_dir)
try:
cmd = self.popen.get_cmd(
"rsync",
"%(auth)s:%(log_dir_rel)s/%(glob_)s" % data,
log_dir)
cmd = self.popen.get_cmd(*cmd_list)
self.popen(*cmd)
except RosePopenError as exc:
self.handle_event(exc, level=Reporter.WARN)
Expand All @@ -891,14 +909,15 @@ def job_logs_pull_remote(self, suite_name, items, prune_remote_mode=False):
try:
cmd = self.popen.get_cmd(
"ssh", auth,
"cd %(log_dir_rel)s && rm -f %(glob_)s" % data)
"cd %(log_dir_rel)s && rm -fr %(glob_)s" % data)
self.popen(*cmd)
except RosePopenError as exc:
self.handle_event(exc, level=Reporter.WARN)
else:
for line in sorted(ssh_ls_out.splitlines()):
event = FileSystemEvent(FileSystemEvent.DELETE,
auth + ":" + line)
event = FileSystemEvent(
FileSystemEvent.DELETE,
"%s:log/job/%s/" % (auth, line))
self.handle_event(event)
finally:
self.fs_util.delete(uuid_file_name)
Expand All @@ -914,19 +933,23 @@ def job_logs_pull_remote(self, suite_name, items, prune_remote_mode=False):
if not name:
name = "*"
logs_prefix = self.get_suite_dir(
suite_name,
"log/job/%s.%s." % (name, cycle))
suite_name,
"log/job/%(cycle)s/%(name)s/*/*" % {
"cycle": cycle, "name": name})
for f_name in glob(logs_prefix + "*"):
if f_name.endswith(".status"):
if f_name.endswith("/job.status"):
continue
stat = os.stat(f_name)
rel_f_name = f_name[len(dir_) + 1:]
# cycle, task, submit_num, extension
cycle, task, s_n, ext = self._parse_job_log_base_name(f_name)
cycle, task, s_n, ext = self._parse_job_log_base_name(
rel_f_name)
if s_n == "NN":
continue
key = ext
if ext in self.JOB_LOG_TAIL_KEYS:
key = self.JOB_LOG_TAIL_KEYS[ext]
stmt_args = [cycle, task, s_n, key, rel_f_name, "",
stmt_args = [cycle, task, int(s_n), key, rel_f_name, "",
stat.st_mtime, stat.st_size]
dao.execute(stmt, stmt_args)
dao.commit()
Expand Down Expand Up @@ -1133,14 +1156,10 @@ def _db_init(self, db_name, user_name, suite_name):
self.daos[db_name][key] = DAO(db_f_name)
return self.daos[db_name][key]

def _parse_job_log_base_name(self, f_name):
@classmethod
def _parse_job_log_base_name(cls, f_name):
"""Return (cycle, task, submit_num, ext)."""
b_names = os.path.basename(f_name).split(self.TASK_ID_DELIM, 3)
task, cycle, submit_num = b_names[0:3]
ext = ""
if len(b_names) > 3:
ext = b_names[3]
return (cycle, task, submit_num, ext)
return f_name.replace("log/job/", "").split("/", 3)

def _parse_task_cycle_id(self, item):
"""Parse name.cycle. Return (cycle, name)."""
Expand Down
Loading

0 comments on commit 386d90c

Please sign in to comment.