Skip to content

Commit

Permalink
Merge pull request #1753 from ESMCI/mfdeakin-sandia/case_submit/prereq
Browse files Browse the repository at this point in the history
Add support for arbitrary prerequisites to case.submit
This PR implements the --prereq option for case.submit, and also has some minor fixes of env_batch.py for some machines

Test suite: scripts_regression_tests
Test baseline:
Test namelist changes:
Test status: ok (tenatively; passes the ones I'd expect to fail)

Fixes #1710

User interface changes?: N

Update gh-pages html (Y/N)?: N

Code review: @jgfouca @jedwards4b
  • Loading branch information
jedwards4b authored Oct 25, 2017
2 parents b8190ea + 41beb58 commit 5f7359b
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 89 deletions.
103 changes: 55 additions & 48 deletions config/acme/machines/config_batch.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,28 @@
</directives>
</batch_system>

<batch_system type="cobalt" >
<batch_query>qstat</batch_query>
<batch_submit>qsub</batch_submit>
<batch_cancel>qdel</batch_cancel>
<batch_env>-v</batch_env>
<batch_directive></batch_directive>
<jobid_pattern>(\d+)</jobid_pattern>
<depend_string> --dependencies</depend_string>
<walltime_format>%H:%M:%s</walltime_format>
<batch_mail_flag>-M</batch_mail_flag>
<batch_mail_type_flag></batch_mail_type_flag>
<batch_mail_type></batch_mail_type>
<submit_args>
<arg flag="--cwd" name="CASEROOT"/>
<arg flag="-A" name="PROJECT"/>
<arg flag="-t" name="JOB_WALLCLOCK_TIME"/>
<arg flag="-n" name=" ($TOTALPES + $MAX_MPITASKS_PER_NODE - 1)/$MAX_MPITASKS_PER_NODE"/>
<arg flag="-q" name="JOB_QUEUE"/>
<arg flag="--mode script"/>
</submit_args>
</batch_system>
<batch_system type="cobalt" >
<batch_query>qstat</batch_query>
<batch_submit>qsub</batch_submit>
<batch_cancel>qdel</batch_cancel>
<batch_env>-v</batch_env>
<batch_directive></batch_directive>
<jobid_pattern>(\d+)</jobid_pattern>
<depend_string> --dependencies jobid</depend_string>
<depend_separator>:</depend_separator>
<walltime_format>%H:%M:%s</walltime_format>
<batch_mail_flag>-M</batch_mail_flag>
<batch_mail_type_flag></batch_mail_type_flag>
<batch_mail_type></batch_mail_type>
<submit_args>
<arg flag="--cwd" name="CASEROOT"/>
<arg flag="-A" name="PROJECT"/>
<arg flag="-t" name="JOB_WALLCLOCK_TIME"/>
<arg flag="-n" name=" ($TOTALPES + $MAX_MPITASKS_PER_NODE - 1)/MAX_MPITASKS_PER_NODE"/>
<arg flag="-q" name="JOB_QUEUE"/>
<arg flag="--mode script"/>
</submit_args>
</batch_system>

<batch_system type="cobalt_theta" >
<batch_query>qstat</batch_query>
Expand All @@ -64,7 +65,8 @@
<batch_env>-v</batch_env>
<batch_directive>#COBALT</batch_directive>
<jobid_pattern>(\d+)</jobid_pattern>
<depend_string> --dependencies</depend_string>
<depend_string>--dependencies jobid</depend_string>
<depend_separator>:</depend_separator>
<batch_mail_flag>-M</batch_mail_flag>
<batch_mail_type_flag></batch_mail_type_flag>
<batch_mail_type></batch_mail_type>
Expand All @@ -84,7 +86,8 @@
<batch_redirect>&lt;</batch_redirect>
<batch_directive>#BSUB</batch_directive>
<jobid_pattern>&lt;(\d+)&gt;</jobid_pattern>
<depend_string> -w 'done(jobid)'</depend_string>
<depend_string>-w 'done(jobid)'</depend_string>
<depend_separator>&amp;&amp;</depend_separator>
<walltime_format>%H:%M</walltime_format>
<batch_mail_flag>-u</batch_mail_flag>
<batch_mail_type_flag></batch_mail_type_flag>
Expand Down Expand Up @@ -114,7 +117,8 @@
<batch_env>-v</batch_env>
<batch_directive>#PBS</batch_directive>
<jobid_pattern>^(\S+)$</jobid_pattern>
<depend_string> -W depend=afterok:jobid</depend_string>
<depend_string>-W depend=afterok:jobid</depend_string>
<depend_separator>:</depend_separator>
<walltime_format>%H:%M:%S</walltime_format>
<batch_mail_flag>-M</batch_mail_flag>
<batch_mail_type_flag>-m</batch_mail_type_flag>
Expand All @@ -140,7 +144,8 @@
<batch_cancel>canceljob</batch_cancel>
<batch_directive>#MSUB</batch_directive>
<jobid_pattern>(\d+)$</jobid_pattern>
<depend_string> -W depend=afterok:jobid</depend_string>
<depend_string>-W depend=afterok:jobid</depend_string>
<depend_separator>:</depend_separator>
<walltime_format>%H:%M:%S</walltime_format>
<batch_mail_flag>-M</batch_mail_flag>
<batch_mail_type_flag>-m</batch_mail_type_flag>
Expand All @@ -165,6 +170,7 @@
<batch_directive>#SBATCH</batch_directive>
<jobid_pattern>(\d+)$</jobid_pattern>
<depend_string> -l depend=jobid</depend_string>
<depend_separator>:</depend_separator>
<walltime_format>%H:%M:%S</walltime_format>
<batch_mail_flag>--mail-user</batch_mail_flag>
<batch_mail_type_flag>--mail-type</batch_mail_type_flag>
Expand All @@ -187,29 +193,30 @@
</batch_system>
<!-- for lawrence livermore computing -->

<batch_system type="slurm" >
<batch_query per_job_arg="-j">squeue</batch_query>
<batch_submit>sbatch</batch_submit>
<batch_cancel>scancel</batch_cancel>
<batch_directive>#SBATCH</batch_directive>
<jobid_pattern>(\d+)$</jobid_pattern>
<depend_string> --dependency=afterok:jobid</depend_string>
<walltime_format>%H:%M:%S</walltime_format>
<batch_mail_flag>--mail-user</batch_mail_flag>
<batch_mail_type_flag>--mail-type</batch_mail_type_flag>
<batch_mail_type>none, all, begin, end, fail</batch_mail_type>
<submit_args>
<arg flag="--time" name="$JOB_WALLCLOCK_TIME"/>
<arg flag="-p" name="$JOB_QUEUE"/>
<arg flag="--account" name="$PROJECT"/>
</submit_args>
<directives>
<directive> --job-name={{ job_id }}</directive>
<directive> --nodes={{ num_nodes }}</directive>
<directive> --output={{ output_error_path }}.%j </directive>
<directive> --exclusive </directive>
</directives>
</batch_system>
<batch_system type="slurm" >
<batch_query per_job_arg="-j">squeue</batch_query>
<batch_submit>sbatch</batch_submit>
<batch_cancel>scancel</batch_cancel>
<batch_directive>#SBATCH</batch_directive>
<jobid_pattern>(\d+)$</jobid_pattern>
<depend_string>--dependency=afterok:jobid</depend_string>
<depend_separator>:</depend_separator>
<walltime_format>%H:%M:%S</walltime_format>
<batch_mail_flag>--mail-user</batch_mail_flag>
<batch_mail_type_flag>--mail-type</batch_mail_type_flag>
<batch_mail_type>none, all, begin, end, fail</batch_mail_type>
<submit_args>
<arg flag="--time" name="$JOB_WALLCLOCK_TIME"/>
<arg flag="-p" name="$JOB_QUEUE"/>
<arg flag="--account" name="$PROJECT"/>
</submit_args>
<directives>
<directive> --job-name={{ job_id }}</directive>
<directive> --nodes={{ num_nodes }}</directive>
<directive> --output={{ output_error_path }}.%j </directive>
<directive> --exclusive </directive>
</directives>
</batch_system>

<!-- blues is PBS -->
<batch_system MACH="blues" type="pbs" >
Expand Down
4 changes: 4 additions & 0 deletions config/xml_schemas/config_batch.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<xs:element name="batch_directive" type="xs:string"/>
<xs:element name="jobid_pattern" type="xs:string"/>
<xs:element name="depend_string" type="xs:string"/>
<xs:element name="depend_separator" type="xs:string"/>
<xs:element name="walltime_format" type="xs:string"/>
<xs:element name="batch_mail_flag" type="xs:string"/>
<xs:element name="batch_mail_type_flag" type="xs:string"/>
Expand Down Expand Up @@ -68,6 +69,9 @@
a previous job has completed successfully -->
<xs:element minOccurs="0" ref="depend_string"/>

<!-- depend_separator: How to separate multiple batch job dependencies -->
<xs:element minOccurs="0" ref="depend_separator"/>

<!-- walltime_format: time format expected by batch system for the wall clock time field -->
<xs:element minOccurs="0" ref="walltime_format"/>

Expand Down
13 changes: 6 additions & 7 deletions scripts/Tools/case.submit
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,23 @@ OR

args = CIME.utils.parse_args_and_handle_standard_logging_options(args, parser)

CIME.utils.expect(args.prereq is None, "--prereq not currently supported")

return args.test, args.caseroot, args.job, args.no_batch, args.resubmit, \
args.skip_preview_namelist, args.mail_user, args.mail_type, \
return args.test, args.caseroot, args.job, args.no_batch, args.prereq, \
args.resubmit, args.skip_preview_namelist, args.mail_user, args.mail_type, \
args.batch_args

###############################################################################
def _main_func(description):
###############################################################################
test, caseroot, job, no_batch, resubmit, skip_pnl, \
test, caseroot, job, no_batch, prereq, resubmit, skip_pnl, \
mail_user, mail_type, batch_args = parse_command_line(sys.argv, description)
if test:
test_results = doctest.testmod(verbose=True)
sys.exit(1 if test_results.failed > 0 else 0)

with Case(caseroot, read_only=False) as case:
submit(case, job=job, no_batch=no_batch, resubmit=resubmit, skip_pnl=skip_pnl,
mail_user=mail_user, mail_type=mail_type, batch_args=batch_args)
submit(case, job=job, no_batch=no_batch, prereq=prereq, resubmit=resubmit,
skip_pnl=skip_pnl, mail_user=mail_user, mail_type=mail_type,
batch_args=batch_args)

if __name__ == "__main__":
_main_func(__doc__)
44 changes: 20 additions & 24 deletions scripts/lib/CIME/XML/env_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def __init__(self, case_root=None, infile="env_batch.xml"):
"""
initialize an object interface to file env_batch.xml in the case directory
"""
self._prereq_jobid = None
self._batchtype = None
# This arbitrary setting should always be overwritten
self._default_walltime = "00:20:00"
Expand Down Expand Up @@ -316,9 +315,9 @@ def get_submit_args(self, case, job):

return submitargs

def submit_jobs(self, case, no_batch=False, job=None, skip_pnl=False,
mail_user=None, mail_type='never', batch_args=None,
dry_run=False):
def submit_jobs(self, case, no_batch=False, job=None, user_prereq=None,
skip_pnl=False, mail_user=None, mail_type='never',
batch_args=None, dry_run=False):
alljobs = self.get_jobs()
startindex = 0
jobs = []
Expand Down Expand Up @@ -355,25 +354,16 @@ def submit_jobs(self, case, no_batch=False, job=None, skip_pnl=False,
deps = dependency.split()
else:
deps = []
jobid = ""
if self._prereq_jobid is not None:
jobid = self._prereq_jobid
dep_jobs = []
if user_prereq is not None:
dep_jobs.append(user_prereq)
for dep in deps:
if dep in depid and depid[dep] is not None:
jobid += " " + str(depid[dep])
#TODO: doubt these will be used
# elif dep == "and":
# jobid += " && "
# elif dep == "or":
# jobid += " || "
if dep in depid.keys() and depid[dep] is not None:
dep_jobs.append(str(depid[dep]))


slen = len(jobid)
if slen == 0:
jobid = None

logger.warning("job is {}".format(job))
result = self._submit_single_job(case, job, jobid,
logger.warning("job {} depends on {}".format(job, dep_jobs))
result = self._submit_single_job(case, job,
dep_jobs=dep_jobs,
no_batch=no_batch,
skip_pnl=skip_pnl,
mail_user=mail_user,
Expand All @@ -391,7 +381,7 @@ def submit_jobs(self, case, no_batch=False, job=None, skip_pnl=False,
else:
return depid

def _submit_single_job(self, case, job, depid=None, no_batch=False,
def _submit_single_job(self, case, job, dep_jobs=None, no_batch=False,
skip_pnl=False, mail_user=None, mail_type='never',
batch_args=None, dry_run=False):
logger.warning("Submit job {}".format(job))
Expand All @@ -415,9 +405,15 @@ def _submit_single_job(self, case, job, depid=None, no_batch=False,
if args_override:
submitargs = args_override

if depid is not None:
if dep_jobs is not None and len(dep_jobs) > 0:
logger.info("dependencies: {}".format(dep_jobs))
dep_string = self.get_value("depend_string", subgroup=None)
dep_string = dep_string.replace("jobid",depid.strip()) # pylint: disable=maybe-no-member
separator_string = self.get_value("depend_separator", subgroup=None)
expect("jobid" in dep_string, "depend_string is missing jobid for prerequisite jobs")
dep_ids_str = str(dep_jobs[0])
for dep_id in dep_jobs[1:]:
dep_ids_str += separator_string + str(dep_id)
dep_string = dep_string.replace("jobid",dep_ids_str.strip()) # pylint: disable=maybe-no-member
submitargs += " " + dep_string

if batch_args is not None:
Expand Down
4 changes: 2 additions & 2 deletions scripts/lib/CIME/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -1165,11 +1165,11 @@ def _get_comp_user_mods(self, component):
else:
return comp_user_mods

def submit_jobs(self, no_batch=False, job=None, skip_pnl=False,
def submit_jobs(self, no_batch=False, job=None, prereq=None, skip_pnl=False,
mail_user=None, mail_type='never', batch_args=None,
dry_run=False):
env_batch = self.get_env('batch')
return env_batch.submit_jobs(self, no_batch=no_batch, job=job,
return env_batch.submit_jobs(self, no_batch=no_batch, job=job, user_prereq=prereq,
skip_pnl=skip_pnl, mail_user=mail_user,
mail_type=mail_type, batch_args=batch_args,
dry_run=dry_run)
Expand Down
18 changes: 10 additions & 8 deletions scripts/lib/CIME/case_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

logger = logging.getLogger(__name__)

def _submit(case, job=None, resubmit=False, no_batch=False, skip_pnl=False,
mail_user=None, mail_type='never', batch_args=None):
def _submit(case, job=None, no_batch=False, prereq=None, resubmit=False,
skip_pnl=False, mail_user=None, mail_type='never', batch_args=None):
if job is None:
if case.get_value("TEST"):
job = "case.test"
Expand Down Expand Up @@ -64,8 +64,8 @@ def _submit(case, job=None, resubmit=False, no_batch=False, skip_pnl=False,

logger.warning("submit_jobs {}".format(job))
job_ids = case.submit_jobs(no_batch=no_batch, job=job, skip_pnl=skip_pnl,
mail_user=mail_user, mail_type=mail_type,
batch_args=batch_args)
prereq=prereq, mail_user=mail_user,
mail_type=mail_type, batch_args=batch_args)

xml_jobids = []
for jobname, jobid in job_ids.items():
Expand All @@ -77,8 +77,8 @@ def _submit(case, job=None, resubmit=False, no_batch=False, skip_pnl=False,
if xml_jobid_text:
case.set_value("JOB_IDS", xml_jobid_text)

def submit(case, job=None, resubmit=False, no_batch=False, skip_pnl=False,
mail_user=None, mail_type='never', batch_args=None):
def submit(case, job=None, no_batch=False, prereq=None, resubmit=False,
skip_pnl=False, mail_user=None, mail_type='never', batch_args=None):
if case.get_value("TEST"):
caseroot = case.get_value("CASEROOT")
casebaseid = case.get_value("CASEBASEID")
Expand All @@ -93,8 +93,10 @@ def submit(case, job=None, resubmit=False, no_batch=False, skip_pnl=False,
ts.set_status(SUBMIT_PHASE, TEST_PASS_STATUS)

try:
functor = lambda: _submit(case, job, resubmit, no_batch, skip_pnl,
mail_user, mail_type, batch_args)
functor = lambda: _submit(case, job=job, no_batch=no_batch, prereq=prereq,
resubmit=resubmit, skip_pnl=skip_pnl,
mail_user=mail_user, mail_type=mail_type,
batch_args=batch_args)
run_and_log_case_status(functor, "case.submit", caseroot=case.get_value("CASEROOT"))
except:
# If something failed in the batch system, make sure to mark
Expand Down
43 changes: 43 additions & 0 deletions scripts/tests/scripts_regression_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from six import assertRaisesRegex
import six

import collections

from CIME.utils import run_cmd, run_cmd_no_fail, get_lids, get_current_commit
import update_acme_tests
import CIME.test_scheduler, CIME.wait_for_tests
Expand Down Expand Up @@ -1394,6 +1396,47 @@ def test_cime_case(self):
# Test some test properties
self.assertEqual(case.get_value("TESTCASE"), "TESTRUNPASS")

###########################################################################
def test_cime_case_prereq(self):
###########################################################################
testcase_name = 'prereq_test'
testdir = os.path.join(TEST_ROOT, testcase_name)
if os.path.exists(testdir):
shutil.rmtree(testdir)
run_cmd_assert_result(self, ("%s/create_newcase --case %s --script-root %s --compset X --res f19_g16 --output-root %s"
% (SCRIPT_DIR, testcase_name, testdir, testdir)),
from_dir=SCRIPT_DIR)

with Case(testdir, read_only=False) as case:
job_name = "case.run"
prereq_name = 'prereq_test'
batch_commands = case.submit_jobs(prereq=prereq_name, job=job_name, skip_pnl=True, dry_run=True)
self.assertTrue(isinstance(batch_commands, collections.Sequence), "case.submit_jobs did not return a sequence for a dry run")
self.assertTrue(len(batch_commands) > 0, "case.submit_jobs did not return any job submission string")
# The first element in the internal sequence should just be the job name
# The second one (batch_cmd_index) should be the actual batch submission command
batch_cmd_index = 1
# The prerequisite should be applied to all jobs, though we're only expecting one
for batch_cmd in batch_commands:
self.assertTrue(isinstance(batch_cmd, collections.Sequence), "case.submit_jobs did not return a sequence of sequences")
self.assertTrue(len(batch_cmd) > batch_cmd_index, "case.submit_jobs returned internal sequences with length <= {}".format(batch_cmd_index))
self.assertTrue(isinstance(batch_cmd[1], str), "case.submit_jobs returned internal sequences without the batch command string as the second parameter: {}".format(batch_cmd[1]))
batch_cmd_args = batch_cmd[1]

jobid_ident = 'jobid'
dep_str_fmt = case.get_env('batch').get_value('depend_string', subgroup=None)
self.assertTrue(jobid_ident in dep_str_fmt, "dependency string doesn't include the jobid identifier {}".format(jobid_ident))
dep_str = dep_str_fmt[:-len(jobid_ident)]

while dep_str in batch_cmd_args:
dep_id_pos = batch_cmd_args.find(dep_str) + len(dep_str)
batch_cmd_args = batch_cmd_args[dep_id_pos:]
prereq_substr = batch_cmd_args[:len(prereq_name)]
if prereq_substr == prereq_name:
break

self.assertTrue(prereq_name in prereq_substr, "Dependencies added, but not the user specified one")

###########################################################################
def test_cime_case_build_threaded_1(self):
###########################################################################
Expand Down

0 comments on commit 5f7359b

Please sign in to comment.