Skip to content

Commit

Permalink
New command-line access to provenance capabilities
Browse files Browse the repository at this point in the history
1) New save_provenance tool. Allows use to execute provenance
capabilities from command-line and independent from running the case.
 1.a) Allows users to decide to save timing info after a run without
      having to rerun.
2) Move getTiming implementation to a library so it does not have to
   be invoked via shell command line.
3) Encapsulate provenance-related code in it's own library: provenance.py
4) Improve quality of save-timing regression test
5) Add regression test for save_provenance
6) Simplify get_timing call from case_run
  • Loading branch information
jgfouca committed Sep 5, 2016
1 parent 2ab2202 commit 5329354
Show file tree
Hide file tree
Showing 10 changed files with 958 additions and 790 deletions.
2 changes: 1 addition & 1 deletion scripts/Tools/code_checker
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def _main_func(description):
sys.exit(1 if test_results.failed > 0 else 0)

pylint = find_executable("pylint")
expect(pylint is not None,"pylint not found")
expect(pylint is not None, "pylint not found")

dir_to_check, num_procs, files = parse_command_line(sys.argv, description)

Expand Down
595 changes: 4 additions & 591 deletions scripts/Tools/getTiming

Large diffs are not rendered by default.

79 changes: 79 additions & 0 deletions scripts/Tools/save_provenance
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env python

"""
This tool provide command-line access to provenance-saving functionality
"""

from standard_script_setup import *

from CIME.case import Case
from CIME.provenance import *
from CIME.utils import get_lids
from CIME.get_timing import get_timing

###############################################################################
def parse_command_line(args, description):
###############################################################################
parser = argparse.ArgumentParser(
usage="""\n%s <MODE> [<casedir>] [--verbose]
OR
%s --help
OR
%s --test
\033[1mEXAMPLES:\033[0m
\033[1;32m# Save run (timing) provenance for current case \033[0m
> %s postrun
""" % ((os.path.basename(args[0]), ) * 4),
description=description,
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)

CIME.utils.setup_standard_logging_options(parser)

parser.add_argument("mode", choices=("build", "prerun", "postrun"),
help="Phase for which to save provenance. "
"prerun is mostly for infrastructure testing; "
"it does not make sense to store this information manually otherwise")

parser.add_argument("caseroot", nargs="?", default=os.getcwd(),
help="Case directory")

parser.add_argument("-l", "--lid",
help="Force system to save provenance with this LID")

args = parser.parse_args(args[1:])

CIME.utils.handle_standard_logging_options(args)

return args.mode, args.caseroot, args.lid

###############################################################################
def _main_func(description):
###############################################################################
mode, caseroot, lid = parse_command_line(sys.argv, description)
with Case(caseroot, read_only=False) as case:
if mode == "build":
expect(False, "Saving build provenance manually is not currently supported "
"but it should already always be happening automatically")
save_build_provenance(case, lid=lid)
elif mode == "prerun":
expect(lid is not None, "You must provide LID for prerun mode")
save_prerun_provenance(case, lid=lid)
elif mode == "postrun":
expect(lid is None, "Please allow me to autodetect LID")
model = case.get_value("MODEL")
caseid = case.get_value("CASE")
case.set_value("SAVE_TIMING", True)
lids = get_lids(case)
for lid in lids:
# call get_timing if needed
expected_timing_file = os.path.join(caseroot, "timing", "%s_timing.%s.%s.gz" % (model, caseid, lid))
if (not os.path.exists(expected_timing_file)):
get_timing(case, lid)
save_postrun_provenance(case, lid=lid)
else:
expect(False, "Unhandled mode '%s'" % mode)

if __name__ == "__main__":
_main_func(__doc__)
5 changes: 3 additions & 2 deletions utils/python/CIME/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from CIME.utils import get_model, append_status
from CIME.preview_namelists import preview_namelists
from CIME.check_input_data import check_input_data
from CIME.provenance import save_build_provenance
import glob, shutil, time, threading, gzip, subprocess

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -137,6 +138,8 @@ def post_build(case, logs):

shutil.copy("env_build.xml", "LockedFiles")

save_build_provenance(case)

###############################################################################
def case_build(caseroot, case, sharedlib_only=False, model_only=False):
###############################################################################
Expand Down Expand Up @@ -593,5 +596,3 @@ def clean(case, cleanlist=None):
# append call of to CaseStatus
msg = "cleanbuild %s "%" ".join(cleanlist)
append_status(msg, caseroot=caseroot, sfile="CaseStatus")

###############################################################################
200 changes: 10 additions & 190 deletions utils/python/CIME/case_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
from CIME.XML.standard_module_setup import *
from CIME.case_submit import submit
from CIME.XML.machines import Machines
from CIME.utils import append_status, touch, gzip_existing_file
from CIME.utils import append_status, gzip_existing_file
from CIME.check_lockedfiles import check_lockedfiles
from CIME.preview_namelists import preview_namelists
from CIME.task_maker import TaskMaker
from CIME.get_timing import get_timing
from CIME.provenance import save_prerun_provenance, save_postrun_provenance

import shutil, time, sys, os, getpass, tarfile, glob, signal
import shutil, time, sys, os, glob

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -131,192 +133,6 @@ def post_run_check(case, lid):
append_status(msg, caseroot=caseroot, sfile="CaseStatus")
expect (False, msg)

###############################################################################
def _get_batch_job_id(case):
###############################################################################
mach = case.get_value("MACH")
if mach == 'titan':
return os.environ("PBS_JOBID")
elif mach in ['edison', 'corip1']:
return os.environ("SLURM_JOB_ID")
elif mach == 'mira':
return os.environ("COBALT_JOBID")
else:
return None

###############################################################################
def save_timing_setup_acme(case, lid):
###############################################################################
if not case.get_value("SAVE_TIMING") or case.get_value("MODEL") != "acme":
return

timing_dir = case.get_value("SAVE_TIMING_DIR")
if timing_dir is None or timing_dir == 'UNSET':
logger.warning("ACME requires SAVE_TIMING_DIR to be set in order to save timings. Skipping save timings")
return
logger.warn("timing dir is %s"%timing_dir)
rundir = case.get_value("RUNDIR")
caseroot = case.get_value("CASEROOT")
cimeroot = case.get_value("CIMEROOT")
base_case = case.get_value("CASEBASEID")
full_timing_dir = os.path.join(timing_dir, "performance_archive", getpass.getuser(), base_case, lid)
expect(not os.path.exists(full_timing_dir), "%s already exists" % full_timing_dir)

os.makedirs(full_timing_dir)
mach = case.get_value("MACH")
compiler = case.get_value("COMPILER")

# For some batch machines save queue info
job_id = _get_batch_job_id(case)
if mach == "mira":
for cmd, filename in [("qstat -lf", "qstatf"), ("qstat -lf %s" % job_id, "qstatf_jobid")]:
run_cmd_no_fail("%s > %s.%s" % (cmd, filename, lid), from_dir=full_timing_dir)
gzip_existing_file(os.path.join(full_timing_dir, filename))
elif mach == ["corip1", "edison"]:
for cmd, filename in [("sqs -f", "sqsf"), ("sqs -w -a", "sqsw"), ("sqs -f %s" % job_id, "sqsf_jobid"), ("squeue", "squeuef")]:
run_cmd_no_fail("%s > %s.%s" % (cmd, filename, lid), from_dir=full_timing_dir)
gzip_existing_file(os.path.join(full_timing_dir, filename))
elif mach == "titan":
for cmd, filename in [("xtdb2proc -f xtdb2proc", "xtdb2procf"),
("qstat -f > qstat", "qstatf"),
("qstat -f %s > qstatf_jobid" % job_id, "qstatf_jobid"),
("xtnodestat > xtnodestat", "xtnodestatf"),
("showq > showqf", "showqf")]:
run_cmd_no_fail(cmd + "." + lid, from_dir=full_timing_dir)
gzip_existing_file(os.path.join(full_timing_dir, filename + "." + lid))

mdiag_reduce = os.path.join(full_timing_dir, "mdiag_reduce." + lid)
run_cmd_no_fail("./mdiag_reduce.csh > %s" % mdiag_reduce, from_dir=os.path.join(caseroot, "Tools"))
gzip_existing_file(mdiag_reduce)

# copy/tar SourceModes
source_mods_dir = os.path.join(caseroot, "SourceMods")
if os.path.isdir(source_mods_dir):
with tarfile.open(os.path.join(full_timing_dir, "SourceMods.%s.tar.gz" % lid), "w:gz") as tfd:
tfd.add(source_mods_dir)

# Save various case configuration items
case_docs = os.path.join(full_timing_dir, "CaseDocs")
os.mkdir(case_docs)
globs_to_copy = [
"CaseDocs/*",
"*.run",
"*.xml",
"user_nl_*",
"*env_mach_specific*",
"Macros",
"README.case",
"Depends.%s" % mach,
"Depends.%s" % compiler,
"Depends.%s.%s" % (mach, compiler),
"software_environment.txt"
]
for glob_to_copy in globs_to_copy:
for item in glob.glob(os.path.join(caseroot, glob_to_copy)):
shutil.copy(item, os.path.join(case_docs, os.path.basename(item) + "." + lid))

if job_id is not None:
sample_interval = case.get_value("SYSLOG_N")
if sample_interval > 0:
archive_checkpoints = os.path.join(full_timing_dir, "checkpoints")
os.mkdir(archive_checkpoints)
touch("%s/acme.log.%s" % (rundir, lid))
syslog_jobid = run_cmd_no_fail("./mach_syslog %d %s %s %s %s/timing/checkpoints %s/checkpoints >& /dev/null & echo $!" %
(sample_interval, job_id, lid, rundir, rundir, archive_checkpoints),
from_dir=os.path.join(caseroot, "Tools"))
with open(os.path.join(rundir, "syslog_jobid", ".%s" % job_id), "w") as fd:
fd.write("%s\n" % syslog_jobid)

# Save state of repo
run_cmd_no_fail("git describe > %s" % os.path.join(full_timing_dir, "GIT_DESCRIBE"), from_dir=cimeroot)

###############################################################################
def save_timing_cesm(case, lid):
###############################################################################
rundir = case.get_value("RUNDIR")
timing_dir = case.get_value("SAVE_TIMING_DIR")
timing_dir = os.path.join(timing_dir, case.get_value("CASE"))
shutil.move(os.path.join(rundir,"timing"),
os.path.join(timing_dir,"timing."+lid))

###############################################################################
def save_timing_acme(case, lid):
###############################################################################
rundir = case.get_value("RUNDIR")
timing_dir = case.get_value("SAVE_TIMING_DIR")
caseroot = case.get_value("CASEROOT")
mach = case.get_value("MACH")
base_case = case.get_value("CASEBASEID")
full_timing_dir = os.path.join(timing_dir, "performance_archive", getpass.getuser(), base_case, lid)

# Kill mach_syslog
job_id = _get_batch_job_id(case)
if job_id is not None:
syslog_jobid_path = os.path.join(rundir, "syslog_jobid", ".%s" % job_id)
if os.path.exists(syslog_jobid_path):
try:
with open(syslog_jobid_path, "r") as fd:
syslog_jobid = int(fd.read().strip())
os.kill(syslog_jobid, signal.SIGTERM)
except (ValueError, OSError) as e:
logger.warning("Failed to kill syslog: %s" % e)
finally:
os.remove(syslog_jobid_path)

# copy/tar timings
with tarfile.open(os.path.join(full_timing_dir, "timing.%s.tar.gz" % lid), "w:gz") as tfd:
tfd.add(os.path.join(rundir, "timing"))

#
# save output files and logs
#
globs_to_copy = []
if mach == "titan":
globs_to_copy.append("%s*OU" % job_id)
elif mach == "mira":
globs_to_copy.append("%s*output" % job_id)
globs_to_copy.append("%s*cobaltlog" % job_id)
elif mach in ["edison", "corip1"]:
globs_to_copy.append("%s" % case.get_value("CASE"))

globs_to_copy.append("logs/acme.log.%s.gz" % lid)
globs_to_copy.append("logs/cpl.log.%s.gz" % lid)
globs_to_copy.append("timing/*.%s" % lid)
globs_to_copy.append("CaseStatus")

for glob_to_copy in globs_to_copy:
for item in glob.glob(os.path.join(caseroot, glob_to_copy)):
shutil.copy(item, full_timing_dir)

###############################################################################
def get_timings(case, lid):
###############################################################################
check_timing = case.get_value("CHECK_TIMING")
if check_timing:
caseroot = case.get_value("CASEROOT")
timingDir = os.path.join(caseroot, "timing")
if not os.path.isdir(timingDir):
os.makedirs(timingDir)

logger.info("Running timing script %s " %(os.path.join(caseroot, "Tools", "getTiming")))
run_cmd_no_fail("%s -lid %s " % (os.path.join(caseroot, "Tools", "getTiming"), lid))

# save the timing files if desired. Some of the details here are
# model dependent.
model = case.get_value("MODEL")
save_timing = case.get_value("SAVE_TIMING")
if save_timing:
if model == "acme":
save_timing_acme(case, lid)
else:
save_timing_cesm(case, lid)

# compress relevant timing files
logger.info( "gzipping timing stats.." )
timingfile = os.path.join(timingDir, model + "_timing_stats." + lid)
gzip_existing_file(timingfile)
logger.info("Done with timings")

###############################################################################
def save_logs(case, lid):
###############################################################################
Expand Down Expand Up @@ -393,17 +209,21 @@ def case_run(case):
lid = time.strftime("%y%m%d-%H%M%S")
os.environ["LID"] = lid

save_timing_setup_acme(case, lid)
save_prerun_provenance(case)

for _ in range(data_assimilation_cycles):
pre_run_check(case)
run_model(case)
post_run_check(case, lid)
save_logs(case, lid) # Copy log files back to caseroot
get_timings(case, lid) # Run the getTiming script
if case.get_value("CHECK_TIMING") or case.get_value("SAVE_TIMING"):
get_timing(case, lid) # Run the getTiming script

if data_assimilation:
do_data_assimilation(data_assimilation_script, lid)

save_postrun_provenance(case)

logger.warn("check for resubmit")
resubmit_check(case)

Expand Down
Loading

0 comments on commit 5329354

Please sign in to comment.