Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suite log/job/ and work/ restructure #1365

Merged
merged 2 commits into from
Aug 12, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions doc/rose-configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -925,8 +925,8 @@ <h2 id="appendix-rose-ana-config">Appendix: rose-ana configuration
[Compare Gradiant]
comparison=Exact
extract=OutputGrepper:' gradJ= \s*(\S+)'
kgo1file=$SUITE_CONTROL_DIR/log/job/{}.1.1.out
resultfile=$ROSE_SUITE_DIR/log/job/{}.1.1.out
kgo1file=$SUITE_CONTROL_DIR/log/job/1/{}/NN/out
resultfile=$ROSE_SUITE_DIR/log/job/1/{}/NN/out
</pre>
</div>
</body>
Expand Down
40 changes: 21 additions & 19 deletions lib/python/rose/apps/rose_prune.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class RosePruneApp(BuiltinApp):
SCHEME = "rose_prune"
SECTION = "prune"

def run(self, app_runner, conf_tree, *_):
def run(self, app_runner, conf_tree, opts, args, uuid, work_files):
"""Suite housekeeping application.

This application is designed to work under "rose task-run" in a cycling
Expand All @@ -60,25 +60,9 @@ def run(self, app_runner, conf_tree, *_):
if archive_logs_cycles:
app_runner.suite_engine_proc.job_logs_archive(
suite_name, archive_logs_cycles)
globs = []
globs = (self._get_prune_globs(app_runner, conf_tree, "datac") +
self._get_prune_globs(app_runner, conf_tree, "work"))
suite_engine_proc = app_runner.suite_engine_proc
for key, max_args in [("datac", 1), ("work", 2)]:
for cycle, cycle_args in self._get_conf(conf_tree,
"prune-" + key + "-at",
max_args=max_args):
tail_globs = None
if cycle_args:
tail_globs = shlex.split(cycle_args.pop())
head_globs = None
if cycle_args:
head_globs = shlex.split(cycle_args.pop())
for head in suite_engine_proc.get_cycle_items_globs(
key, cycle, head_globs):
if tail_globs:
for tail_glob in tail_globs:
globs.append(os.path.join(head, tail_glob))
else:
globs.append(head)
hosts = suite_engine_proc.get_suite_jobs_auths(suite_name)
suite_dir_rel = suite_engine_proc.get_suite_dir_rel(suite_name)
form_dict = {"d": suite_dir_rel, "g": " ".join(globs)}
Expand Down Expand Up @@ -164,3 +148,21 @@ def _get_conf(self, conf_tree, key, max_args=0):
else:
items.append(cycle)
return items

def _get_prune_globs(self, app_runner, conf_tree, key):
"""Return prune globs for "key"."""
globs = []
for cycle, cycle_args in self._get_conf(conf_tree,
"prune-" + key + "-at",
max_args=1):
tail_globs = None
if cycle_args:
tail_globs = shlex.split(cycle_args.pop())
for head in app_runner.suite_engine_proc.get_cycle_items_globs(
key, cycle):
if tail_globs:
for tail_glob in tail_globs:
globs.append(os.path.join(head, tail_glob))
else:
globs.append(head)
return globs
4 changes: 3 additions & 1 deletion lib/python/rose/suite_engine_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"""Suite engine processor management."""

from datetime import datetime, timedelta
from glob import glob
from isodatetime.data import Duration
from isodatetime.parsers import DurationParser
import os
Expand Down Expand Up @@ -260,6 +261,7 @@ class TaskProps(object):
"task_suffix": "ROSE_TASK_SUFFIX",
"cycling_mode": "ROSE_CYCLING_MODE",
"task_cycle_time": "ROSE_TASK_CYCLE_TIME",
"task_log_dir": "ROSE_TASK_LOG_DIR",
"task_log_root": "ROSE_TASK_LOG_ROOT",
"task_is_cold_start": "ROSE_TASK_IS_COLD_START",
"dir_data": "ROSE_DATA",
Expand Down Expand Up @@ -367,7 +369,7 @@ def cmp_suite_conf(self, suite_name, strict_mode=False, debug_mode=False):
"""
raise NotImplementedError()

def get_cycle_items_globs(self, name, cycle, task_glob=None):
def get_cycle_items_globs(self, name, cycle):
"""Return a glob to match named items created for a given cycle.

E.g.:
Expand Down
127 changes: 73 additions & 54 deletions lib/python/rose/suite_engine_procs/cylc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#-----------------------------------------------------------------------------
"""Logic specific to the Cylc suite engine."""

from isodatetime.parsers import TimePointParser
import filecmp
from fnmatch import fnmatch
from glob import glob
Expand Down Expand Up @@ -62,7 +61,10 @@ class CylcProcessor(SuiteEngineProcessor):
EVENT_RANKS = {"submit-init": 0, "submit": 1, "fail(submit)": 1, "init": 2,
"success": 3, "fail": 3, "fail(%s)": 4}
JOB_LOGS_DB = "log/rose-job-logs.db"
JOB_LOG_TAIL_KEYS = {"": "00-script", "out": "01-out", "err": "02-err"}
JOB_LOG_TAIL_KEYS = {
"job": "00-script",
"job.out": "01-out",
"job.err": "02-err"}
ORDERS = {
"time_desc":
"time DESC, task_events.submit_num DESC, name DESC, cycle DESC",
Expand Down Expand Up @@ -192,7 +194,7 @@ def gcontrol(self, suite_name, host=None, engine_version=None, args=None):
self.popen(fmt % (host, suite_name, args_str, os.devnull),
env=environ, shell=True)

def get_cycle_items_globs(self, name, cycle, task_globs=None):
def get_cycle_items_globs(self, name, cycle):
"""Return a glob to match named items created for a given cycle.

E.g.:
Expand All @@ -202,15 +204,8 @@ def get_cycle_items_globs(self, name, cycle, task_globs=None):
Return None if named item not supported.

"""
if task_globs is None:
task_globs = ["*"]
dict_ = {"datac": "share/data/%(cycle)s",
"work": "work/%(task_glob)s.%(cycle)s"}
globs = []
for task_glob in task_globs:
glob_ = dict_.get(name) % {"cycle": cycle, "task_glob": task_glob}
globs.append(glob_)
return globs
dict_ = {"datac": "share/data/%(cycle)s", "work": "work/%(cycle)s"}
return [dict_.get(name) % {"cycle": cycle}]

def get_suite_dir_rel(self, suite_name, *paths):
"""Return the relative path to the suite running directory.
Expand Down Expand Up @@ -629,6 +624,7 @@ def get_task_props_from_env(self):
task_id=task_id,
task_name=task_name,
task_cycle_time=task_cycle_time,
task_log_dir=os.path.dirname(task_log_root),
task_log_root=task_log_root,
task_is_cold_start=task_is_cold_start,
cycling_mode=cycling_mode)
Expand Down Expand Up @@ -763,40 +759,49 @@ def job_logs_archive(self, suite_name, items):
if cycle:
cycles.append(cycle)
self.job_logs_pull_remote(suite_name, cycles, prune_remote_mode=True)
log_dir = self.get_suite_dir(suite_name, "log")
cwd = os.getcwd()
self.fs_util.chdir(log_dir)
self.fs_util.chdir(self.get_suite_dir(suite_name))
try:
stmt = ("UPDATE log_files SET path=?, path_in_tar=? " +
"WHERE cycle==? AND task==? AND submit_num==? AND key==?")
for cycle in cycles:
archive_file_name0 = "job-" + cycle + ".tar"
archive_file_name0 = os.path.join("log",
"job-" + cycle + ".tar")
archive_file_name = archive_file_name0 + ".gz"
if os.path.exists(archive_file_name):
continue
glob_ = self.TASK_ID_DELIM.join(["*", cycle, "*"])
names = glob(os.path.join("job", glob_))
glob_ = os.path.join(cycle, "*", "*", "*")
names = glob(os.path.join("log", "job", glob_))
if not names:
continue
f_bsize = os.statvfs(".").f_bsize
tar = tarfile.open(archive_file_name0, "w", bufsize=f_bsize)
for name in names:
tar.add(name)
cycle, task, s_n, ext = self._parse_job_log_base_name(name)
if s_n == "NN" or ext == "job.status":
continue
tar.add(name, name.replace("log/", "", 1))
tar.close()
# N.B. Python's gzip is slow
self.popen.run_simple("gzip", "-f", archive_file_name0)
self.handle_event(FileSystemEvent(FileSystemEvent.CREATE,
archive_file_name))
for name in sorted(names):
self.fs_util.delete(name)
self.fs_util.delete(os.path.join("log", "job", cycle))
for name in names:
# cycle, task, submit_num, extension
cycle, task, s_n, ext = self._parse_job_log_base_name(name)
if s_n == "NN" or ext == "job.status":
continue
key = ext
if ext in self.JOB_LOG_TAIL_KEYS:
key = self.JOB_LOG_TAIL_KEYS[ext]
stmt_args = [os.path.join("log", archive_file_name),
name, cycle, task, s_n, key]
stmt_args = [
os.path.join(archive_file_name),
name.replace("log/", "", 1),
cycle,
task,
int(s_n),
key]
self._db_exec(self.JOB_LOGS_DB, None, suite_name,
stmt, stmt_args, commit=True)
finally:
Expand Down Expand Up @@ -842,11 +847,11 @@ def job_logs_pull_remote(self, suite_name, items, prune_remote_mode=False):
uuid_file_name = os.path.join(log_dir, uuid)
self.fs_util.touch(uuid_file_name)
try:
glob_auths_map = {}
auths_filters = [] # [(auths, includes, excludes), ...]
if "*" in items:
auths = self.get_suite_jobs_auths(suite_name)
if auths:
glob_auths_map["*"] = self.get_suite_jobs_auths(suite_name)
auths_filters.append((auths, [], []))
else:
for item in items:
cycle, name = self._parse_task_cycle_id(item)
Expand All @@ -856,33 +861,46 @@ def job_logs_pull_remote(self, suite_name, items, prune_remote_mode=False):
continue
auths = self.get_suite_jobs_auths(suite_name, cycle, name)
if auths:
glob_names = []
for list_ in [name, cycle, None]:
if list_ is None:
glob_names.append("*")
else:
glob_names.append(list_)
glob_ = self.TASK_ID_DELIM.join(glob_names)
glob_auths_map[glob_] = auths
# FIXME: more efficient if auth is key?
for glob_, auths in glob_auths_map.items():
includes = []
excludes = []
if cycle is None and name is None:
includes = []
excludes = []
elif name is None:
includes = ["/" + cycle]
excludes = ["/*"]
elif cycle is None:
includes = ["/*/" + name]
excludes = ["/*/*"]
else:
includes = ["/" + cycle, "/" + cycle + "/" + name]
excludes = ["/*", "/*/*"]
auths_filters.append((auths, includes, excludes))

for auths, includes, excludes in auths_filters:
for auth in auths:
data = {"auth": auth,
"log_dir_rel": log_dir_rel,
"uuid": uuid,
"glob_": glob_}
"glob_": "*"}
if includes:
data["glob_"] = includes[-1][1:] # Remove leading /
cmd = self.popen.get_cmd(
"ssh", auth,
("cd %(log_dir_rel)s && " +
"(! test -f %(uuid)s && ls -d %(glob_)s)") % data)
ret_code, ssh_ls_out, _ = self.popen.run(*cmd)
if ret_code:
continue
cmd_list = ["rsync"]
for include in includes:
cmd_list.append("--include=" + include)
for exclude in excludes:
cmd_list.append("--exclude=" + exclude)
cmd_list.append("%(auth)s:%(log_dir_rel)s/" % data)
cmd_list.append(log_dir)
try:
cmd = self.popen.get_cmd(
"rsync",
"%(auth)s:%(log_dir_rel)s/%(glob_)s" % data,
log_dir)
cmd = self.popen.get_cmd(*cmd_list)
self.popen(*cmd)
except RosePopenError as exc:
self.handle_event(exc, level=Reporter.WARN)
Expand All @@ -891,14 +909,15 @@ def job_logs_pull_remote(self, suite_name, items, prune_remote_mode=False):
try:
cmd = self.popen.get_cmd(
"ssh", auth,
"cd %(log_dir_rel)s && rm -f %(glob_)s" % data)
"cd %(log_dir_rel)s && rm -fr %(glob_)s" % data)
self.popen(*cmd)
except RosePopenError as exc:
self.handle_event(exc, level=Reporter.WARN)
else:
for line in sorted(ssh_ls_out.splitlines()):
event = FileSystemEvent(FileSystemEvent.DELETE,
auth + ":" + line)
event = FileSystemEvent(
FileSystemEvent.DELETE,
"%s:log/job/%s/" % (auth, line))
self.handle_event(event)
finally:
self.fs_util.delete(uuid_file_name)
Expand All @@ -914,19 +933,23 @@ def job_logs_pull_remote(self, suite_name, items, prune_remote_mode=False):
if not name:
name = "*"
logs_prefix = self.get_suite_dir(
suite_name,
"log/job/%s.%s." % (name, cycle))
suite_name,
"log/job/%(cycle)s/%(name)s/*/*" % {
"cycle": cycle, "name": name})
for f_name in glob(logs_prefix + "*"):
if f_name.endswith(".status"):
if f_name.endswith("/job.status"):
continue
stat = os.stat(f_name)
rel_f_name = f_name[len(dir_) + 1:]
# cycle, task, submit_num, extension
cycle, task, s_n, ext = self._parse_job_log_base_name(f_name)
cycle, task, s_n, ext = self._parse_job_log_base_name(
rel_f_name)
if s_n == "NN":
continue
key = ext
if ext in self.JOB_LOG_TAIL_KEYS:
key = self.JOB_LOG_TAIL_KEYS[ext]
stmt_args = [cycle, task, s_n, key, rel_f_name, "",
stmt_args = [cycle, task, int(s_n), key, rel_f_name, "",
stat.st_mtime, stat.st_size]
dao.execute(stmt, stmt_args)
dao.commit()
Expand Down Expand Up @@ -1133,14 +1156,10 @@ def _db_init(self, db_name, user_name, suite_name):
self.daos[db_name][key] = DAO(db_f_name)
return self.daos[db_name][key]

def _parse_job_log_base_name(self, f_name):
@classmethod
def _parse_job_log_base_name(cls, f_name):
"""Return (cycle, task, submit_num, ext)."""
b_names = os.path.basename(f_name).split(self.TASK_ID_DELIM, 3)
task, cycle, submit_num = b_names[0:3]
ext = ""
if len(b_names) > 3:
ext = b_names[3]
return (cycle, task, submit_num, ext)
return f_name.replace("log/job/", "").split("/", 3)

def _parse_task_cycle_id(self, item):
"""Parse name.cycle. Return (cycle, name)."""
Expand Down
Loading