Skip to content

Commit

Permalink
Merge pull request #1450 from ESMCI/jgfouca/upgrade_queue_selection
Browse files Browse the repository at this point in the history
Update queue selection to take walltime into account

Adds concept of strict walltime.

The idea here is to have better support for machines like blues that have a "debug" queue and a "standard" queue. The "debug" has strict limits on both walltime and num_pes and therefore should not be selected as the user's queue if they asked for a long walltime. For other machines, the maxwalltime setting is being used more like a default walltime than a true max.

Test suite: scripts_regression_tests (melvin and skybridge) and some by-hand testing on blues
Test baseline:
Test namelist changes:
Test status: bit for bit

Fixes #1255

User interface changes?: Changes in how walltime is handled

Code review: @jedwards4b @jayeshkrishna @rljacob
  • Loading branch information
jgfouca authored May 2, 2017
2 parents 5619a82 + 26f0ffc commit ce52822
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 39 deletions.
10 changes: 5 additions & 5 deletions config/acme/machines/config_batch.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@
<directive>-l nodes={{ num_nodes }}:ppn={{ tasks_per_node }}</directive>
</directives>
<queues>
<queue walltimemax="01:00:00" jobmin="1" jobmax="64">shared</queue>
<queue walltimemax="01:00:00" jobmin="1" jobmax="64" strict="true">shared</queue>
<queue walltimemax="03:00:00" jobmin="64" jobmax="4096" default="true">batch</queue>
</queues>
</batch_system>
Expand All @@ -177,8 +177,8 @@
<!-- edison is SLURM as of Jan-4-2016 -->
<batch_system MACH="edison" type="slurm" >
<queues>
<queue walltimemax="00:30:00" jobmin="1" jobmax="12288" strict="true">debug</queue>
<queue walltimemax="01:30:00" jobmin="1" jobmax="150000" default="true">regular</queue>
<queue walltimemax="00:30:00" jobmin="1" jobmax="12288">debug</queue>
</queues>
</batch_system>

Expand All @@ -198,8 +198,8 @@
<directive> --constraint=haswell</directive>
</directives>
<queues>
<queue walltimemax="00:30:00" jobmin="1" jobmax="4096" strict="true">debug</queue>
<queue walltimemax="01:00:00" jobmin="1" jobmax="10000" default="true">regular</queue>
<queue walltimemax="00:30:00" jobmin="1" jobmax="4096">debug</queue>
</queues>
</batch_system>

Expand All @@ -208,8 +208,8 @@
<directive> --constraint=knl,quad,cache</directive>
</directives>
<queues>
<queue walltimemax="00:30:00" jobmin="1" jobmax="100000" strict="true">debug</queue>
<queue walltimemax="01:00:00" jobmin="1" jobmax="3000000" default="true">regular</queue>
<queue walltimemax="00:30:00" jobmin="1" jobmax="100000">debug</queue>
</queues>
</batch_system>

Expand Down Expand Up @@ -329,8 +329,8 @@
<directive>-l nodes={{ num_nodes }}</directive>
</directives>
<queues>
<queue walltimemax="01:00:00" jobmin="0" jobmax="64" strict="true">debug</queue>
<queue walltimemax="24:00:00" default="true">batch</queue>
<queue walltimemax="01:00:00" jobmin="0" jobmax="64">debug</queue>
</queues>
</batch_system>

Expand Down
3 changes: 2 additions & 1 deletion config/xml_schemas/config_batch.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<xs:element minOccurs="0" ref="directives"/>

<!-- queues: The list of queue options for this machine, not all system queues need be listed
attributes of this field include walltimemin, walltimemax, jobmin and jobmax
attributes of this field include walltimemin, walltimemax, jobmin and jobmax and strict
default wallclock time for a job is the walltimemax in this field -->
<xs:element minOccurs="0" ref="queues"/>
</xs:sequence>
Expand Down Expand Up @@ -130,6 +130,7 @@
<xs:simpleContent>
<xs:extension base="xs:NCName">
<xs:attribute name="default" type="xs:boolean"/>
<xs:attribute name="strict" type="xs:boolean"/>
<xs:attribute name="jobmax" type="xs:integer"/>
<xs:attribute name="jobmin" type="xs:integer"/>
<xs:attribute name="jobname" type="xs:NCName"/>
Expand Down
5 changes: 2 additions & 3 deletions scripts/create_test
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import argparse, math, glob

logger = logging.getLogger(__name__)


###############################################################################
def parse_command_line(args, description):
###############################################################################
Expand Down Expand Up @@ -418,8 +417,8 @@ def single_submit_impl(machine_name, test_id, proc_pool, project, args, job_cost
else:
wall_time_bab = wall_time

queue = env_batch.select_best_queue(proc_pool)
wall_time_max_bab = env_batch.get_max_walltime(queue)
queue = env_batch.select_best_queue(proc_pool, wall_time_bab)
wall_time_max_bab = env_batch.get_queue_specs(queue)[3]
if wall_time_max_bab is not None:
wall_time_max = convert_to_seconds(wall_time_max_bab)
if wall_time_max < wall_time:
Expand Down
100 changes: 72 additions & 28 deletions scripts/lib/CIME/XML/env_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from CIME.XML.standard_module_setup import *
from CIME.utils import format_time
from CIME.XML.env_base import EnvBase
from CIME.utils import transform_vars, get_cime_root
from CIME.utils import transform_vars, get_cime_root, convert_to_seconds
from copy import deepcopy

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -181,7 +181,7 @@ def make_batch_script(self, input_template, job, case, total_tasks, tasks_per_no
fd.write(output_text)
os.chmod(job, os.stat(job).st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)

def set_job_defaults(self, batch_jobs, pesize=None, walltime=None, force_queue=None):
def set_job_defaults(self, batch_jobs, pesize=None, walltime=None, force_queue=None, allow_walltime_override=False):
if self.batchtype is None:
self.batchtype = self.get_batch_system_type()

Expand All @@ -195,18 +195,35 @@ def set_job_defaults(self, batch_jobs, pesize=None, walltime=None, force_queue=N
else:
task_count = int(task_count)

queue = force_queue if force_queue is not None else self.select_best_queue(task_count, job)
self.set_value("JOB_QUEUE", queue, subgroup=job)

walltime = self.get_max_walltime(queue) if walltime is None else walltime
if walltime is None:
logger.warn("Could not find a queue matching task count %d, falling back to deprecated default walltime parameter"%task_count)
#if the user names a queue which is not defined in config_batch.xml and does not set a
#walltime, fall back to the max walltime in the default queue
if force_queue:
self.get_default_queue()
walltime = self._default_walltime
if force_queue:
if not self.queue_meets_spec(force_queue, task_count, walltime=walltime, job=job):
logger.warning("WARNING: User-requested queue '%s' does not meet requirements for job '%s'" % (force_queue, job))
else:
queue = self.select_best_queue(task_count, walltime=walltime, job=job)
if queue is None and walltime is not None:
# Try to see if walltime was the holdup
queue = self.select_best_queue(task_count, walltime=None, job=job)
if queue is not None:
# It was, override the walltime if a test, otherwise just warn the user
new_walltime = self.get_queue_specs(queue)[3]
expect(new_walltime is not None, "Should never make it here")
logger.warning("WARNING: Requested walltime '%s' could not be matched by any queue" % walltime)
if allow_walltime_override:
logger.warning(" Using walltime '%s' instead" % new_walltime)
walltime = new_walltime
else:
logger.warning(" Continuing with suspect walltime, batch submission may fail")

if queue is None:
logger.warning("WARNING: No queue on this system met the requirements for this job. Falling back to defaults")
default_queue_node = self.get_default_queue()
queue = default_queue_node.text
walltime = self.get_queue_specs(queue)[3]

walltime = self.get_queue_specs(queue)[3] if walltime is None else walltime
walltime = self._default_walltime if walltime is None else walltime # last-chance fallback

self.set_value("JOB_QUEUE", queue, subgroup=job)
self.set_value("JOB_WALLCLOCK_TIME", walltime, subgroup=job)
logger.debug("Job %s queue %s walltime %s" % (job, queue, walltime))

Expand Down Expand Up @@ -362,7 +379,6 @@ def _submit_single_job(self, case, job, depid=None, no_batch=False, batch_args=N

function_name = job.replace(".", "_")
if not dry_run:
function_name = job.replace(".", "_")
locals()[function_name](case)

return
Expand Down Expand Up @@ -418,36 +434,64 @@ def get_job_id(self, output):
jobid = search_match.group(1)
return jobid

def select_best_queue(self, num_pes, job=None):
def queue_meets_spec(self, queue, num_pes, walltime=None, job=None):
jobmin, jobmax, jobname, walltimemax, strict = self.get_queue_specs(queue)

# A job name match automatically meets spec
if job is not None and jobname is not None:
return jobname == job

if jobmin is not None and num_pes < int(jobmin):
return False

if jobmax is not None and num_pes > int(jobmax):
return False

if walltime is not None and walltimemax is not None and strict:
walltime_s = convert_to_seconds(walltime)
walltimemax_s = convert_to_seconds(walltimemax)
if walltime_s > walltimemax_s:
return False

return True

def select_best_queue(self, num_pes, walltime=None, job=None):
# Make sure to check default queue first.
all_queues = []
all_queues.append( self.get_default_queue())
all_queues = all_queues + self.get_all_queues()
for queue in all_queues:
if queue is not None:
jobmin = queue.get("jobmin")
jobmax = queue.get("jobmax")
jobname = queue.get("jobname")
if jobname is not None:
if job == jobname:
return queue.text
# if the fullsum is between the min and max # jobs, then use this queue.
elif jobmin is not None and jobmax is not None and num_pes >= int(jobmin) and num_pes <= int(jobmax):
return queue.text
qname = queue.text
if self.queue_meets_spec(qname, num_pes, walltime=walltime, job=job):
return qname

return None

def get_max_walltime(self, queue):
def get_queue_specs(self, queue):
"""
Get queue specifications by name.
Returns (jobmin, jobmax, jobname, walltimemax, is_strict)
"""
for queue_node in self.get_all_queues():
if queue_node.text == queue:
return queue_node.get("walltimemax")
jobmin = queue_node.get("jobmin")
jobmax = queue_node.get("jobmax")
jobname = queue_node.get("jobname")
walltimemax = queue_node.get("walltimemax")
strict = queue_node.get("strict") == "true"

return jobmin, jobmax, jobname, walltimemax, strict

expect(False, "Queue '%s' is unknown to this system" % queue)

def get_default_queue(self):
node = self.get_optional_node("queue", attributes={"default" : "true"})
if node is None:
node = self.get_optional_node("queue")
expect(node is not None, "No queues found")
self._default_walltime = node.get("walltimemax")
return(node)
return node

def get_all_queues(self):
return self.get_nodes("queue")
Expand Down
3 changes: 2 additions & 1 deletion scripts/lib/CIME/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ def configure(self, compset_name, grid_name, machine_name=None,

env_batch.set_batch_system(batch, batch_system_type=batch_system_type)
env_batch.create_job_groups(bjobs)
env_batch.set_job_defaults(bjobs, pesize=maxval, walltime=walltime, force_queue=queue)
env_batch.set_job_defaults(bjobs, pesize=maxval, walltime=walltime, force_queue=queue, allow_walltime_override=test)
self.schedule_rewrite(env_batch)

#--------------------------------------------
Expand Down Expand Up @@ -810,6 +810,7 @@ def configure(self, compset_name, grid_name, machine_name=None,
if model == "cesm" and not test:
self.set_value("DOUT_S",True)
self.set_value("TIMER_LEVEL", 4)

if test:
self.set_value("TEST",True)

Expand Down
1 change: 0 additions & 1 deletion scripts/lib/CIME/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,6 @@ def _create_newcase_phase(self, test):
create_newcase_cmd += " --mpilib %s" % self._mpilib
logger.debug (" MPILIB set to %s" % self._mpilib)


if self._queue is not None:
create_newcase_cmd += " --queue=%s" % self._queue

Expand Down
84 changes: 84 additions & 0 deletions scripts/tests/scripts_regression_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,90 @@ def test_cime_case_xmlchange_append(self):
result = run_cmd_assert_result(self, "./xmlquery --value PIO_CONFIG_OPTS", from_dir=casedir)
self.assertEqual(result, "-opt1 -opt2")

###########################################################################
def test_cime_case_test_walltime_mgmt_1(self):
###########################################################################
if CIME.utils.get_model() != "acme":
self.skipTest("Skipping walltime test. Depends on ACME batch settings")

test_name = "ERS.f19_g16_rx1.A"
machine, compiler = "blues", "gnu"
run_cmd_assert_result(self, "unset CIME_GLOBAL_WALLTIME && %s/create_test --no-setup --machine %s %s -t %s --test-root %s --output-root %s" %
(SCRIPT_DIR, machine, test_name, self._baseline_name, self._testroot, self._testroot))

casedir = os.path.join(self._testroot,
"%s.%s" % (CIME.utils.get_full_test_name(test_name, machine=machine, compiler=compiler), self._baseline_name))
self.assertTrue(os.path.isdir(casedir), msg="Missing casedir '%s'" % casedir)

result = run_cmd_assert_result(self, "./xmlquery JOB_WALLCLOCK_TIME --subgroup=case.test --value", from_dir=casedir)
self.assertEqual(result, "0:10:00")

result = run_cmd_assert_result(self, "./xmlquery JOB_QUEUE --subgroup=case.test --value", from_dir=casedir)
self.assertEqual(result, "shared")

###########################################################################
def test_cime_case_test_walltime_mgmt_2(self):
###########################################################################
if CIME.utils.get_model() != "acme":
self.skipTest("Skipping walltime test. Depends on ACME batch settings")

test_name = "ERS_P64.f19_g16_rx1.A"
machine, compiler = "blues", "gnu"
run_cmd_assert_result(self, "unset CIME_GLOBAL_WALLTIME && %s/create_test --no-setup --machine %s %s -t %s --test-root %s --output-root %s" %
(SCRIPT_DIR, machine, test_name, self._baseline_name, self._testroot, self._testroot))

casedir = os.path.join(self._testroot,
"%s.%s" % (CIME.utils.get_full_test_name(test_name, machine=machine, compiler=compiler), self._baseline_name))
self.assertTrue(os.path.isdir(casedir), msg="Missing casedir '%s'" % casedir)

result = run_cmd_assert_result(self, "./xmlquery JOB_WALLCLOCK_TIME --subgroup=case.test --value", from_dir=casedir)
self.assertEqual(result, "03:00:00")

result = run_cmd_assert_result(self, "./xmlquery JOB_QUEUE --subgroup=case.test --value", from_dir=casedir)
self.assertEqual(result, "batch")

###########################################################################
def test_cime_case_test_walltime_mgmt_3(self):
###########################################################################
if CIME.utils.get_model() != "acme":
self.skipTest("Skipping walltime test. Depends on ACME batch settings")

test_name = "ERS_P64.f19_g16_rx1.A"
machine, compiler = "blues", "gnu"
run_cmd_assert_result(self, "unset CIME_GLOBAL_WALLTIME && %s/create_test --no-setup --machine %s %s -t %s --test-root %s --output-root %s --walltime='0:10:00'" %
(SCRIPT_DIR, machine, test_name, self._baseline_name, self._testroot, self._testroot))

casedir = os.path.join(self._testroot,
"%s.%s" % (CIME.utils.get_full_test_name(test_name, machine=machine, compiler=compiler), self._baseline_name))
self.assertTrue(os.path.isdir(casedir), msg="Missing casedir '%s'" % casedir)

result = run_cmd_assert_result(self, "./xmlquery JOB_WALLCLOCK_TIME --subgroup=case.test --value", from_dir=casedir)
self.assertEqual(result, "0:10:00")

result = run_cmd_assert_result(self, "./xmlquery JOB_QUEUE --subgroup=case.test --value", from_dir=casedir)
self.assertEqual(result, "batch") # Not smart enough to select faster queue

###########################################################################
def test_cime_case_test_walltime_mgmt_4(self):
###########################################################################
if CIME.utils.get_model() != "acme":
self.skipTest("Skipping walltime test. Depends on ACME batch settings")

test_name = "ERS_P1.f19_g16_rx1.A"
machine, compiler = "blues", "gnu"
run_cmd_assert_result(self, "unset CIME_GLOBAL_WALLTIME && %s/create_test --no-setup --machine %s %s -t %s --test-root %s --output-root %s --walltime='2:00:00'" %
(SCRIPT_DIR, machine, test_name, self._baseline_name, self._testroot, self._testroot))

casedir = os.path.join(self._testroot,
"%s.%s" % (CIME.utils.get_full_test_name(test_name, machine=machine, compiler=compiler), self._baseline_name))
self.assertTrue(os.path.isdir(casedir), msg="Missing casedir '%s'" % casedir)

result = run_cmd_assert_result(self, "./xmlquery JOB_WALLCLOCK_TIME --subgroup=case.test --value", from_dir=casedir)
self.assertEqual(result, "01:00:00")

result = run_cmd_assert_result(self, "./xmlquery JOB_QUEUE --subgroup=case.test --value", from_dir=casedir)
self.assertEqual(result, "shared")

###############################################################################
class X_TestSingleSubmit(TestCreateTestCommon):
###############################################################################
Expand Down

0 comments on commit ce52822

Please sign in to comment.