Skip to content

Commit

Permalink
Add MOS jobs and new prepost flag to rocoto setup
Browse files Browse the repository at this point in the history
- Add MOS jobs to rocoto workflow setup
- Add MOS jobs to archive dependencies using for loop
- Add new prepost flag to setup for use on WCOSS2;
when "prepost=true" is set for a job in config.resources an
additional flag is included at the end of the PBS select line
via inclusion in the <memory> tag in the rocoto xml

Refs NOAA-EMC#2068
  • Loading branch information
KateFriedman-NOAA committed Dec 12, 2023
1 parent 9b26bf0 commit 52781c6
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 1 deletion.
1 change: 1 addition & 0 deletions workflow/applications/applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(self, conf: Configuration) -> None:
self.do_genesis_fsu = _base.get('DO_GENESIS_FSU', False)
self.do_metp = _base.get('DO_METP', False)
self.do_upp = not _base.get('WRITE_DOPOST', True)
self.do_mos = _base.get('DO_MOS', False)

self.do_hpssarch = _base.get('HPSSARCH', False)

Expand Down
12 changes: 12 additions & 0 deletions workflow/applications/gfs_cycled.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ def _get_app_configs(self):
if self.do_jedilandda:
configs += ['preplandobs', 'landanl']

if self.do_mos:
configs += ['mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep']
configs += ['mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst']
configs += ['mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen']
configs += ['mos_wx_prdgen', 'mos_wx_ext_prdgen']

return configs

@staticmethod
Expand Down Expand Up @@ -238,6 +244,12 @@ def get_task_names(self):
if self.do_awips:
gfs_tasks += ['awips_20km_1p0deg', 'awips_g2', 'fbwinds']

if self.do_mos:
gfs_tasks += ['mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep']
gfs_tasks += ['mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst']
gfs_tasks += ['mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen']
gfs_tasks += ['mos_wx_prdgen', 'mos_wx_ext_prdgen']

gfs_tasks += gdas_gfs_common_cleanup_tasks

tasks = dict()
Expand Down
12 changes: 12 additions & 0 deletions workflow/applications/gfs_forecast_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ def _get_app_configs(self):
if self.do_awips:
configs += ['waveawipsbulls', 'waveawipsgridded']

if self.do_mos:
configs += ['mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep']
configs += ['mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst']
configs += ['mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen']
configs += ['mos_wx_prdgen', 'mos_wx_ext_prdgen']

return configs

@staticmethod
Expand Down Expand Up @@ -129,6 +135,12 @@ def get_task_names(self):
if self.do_awips:
tasks += ['waveawipsbulls', 'waveawipsgridded']

if self.do_mos:
tasks += ['mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep']
tasks += ['mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst']
tasks += ['mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen']
tasks += ['mos_wx_prdgen', 'mos_wx_ext_prdgen']

tasks += ['arch', 'cleanup'] # arch and cleanup **must** be the last tasks

return {f"{self._base['CDUMP']}": tasks}
190 changes: 190 additions & 0 deletions workflow/rocoto/gfs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,187 @@ def metp(self):

return task

def mos_stn_prep(self):
deps = []
dep_dict = {'type': 'metatask', 'name': f'{self.cdump}atmprod'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)

resources = self.get_resource('mos_stn_prep')
task = create_wf_task('mos_stn_prep', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def mos_grd_prep(self):
deps = []
dep_dict = {'type': 'metatask', 'name': f'{self.cdump}atmprod'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)

resources = self.get_resource('mos_grd_prep')
task = create_wf_task('mos_grd_prep', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def mos_ext_stn_prep(self):
deps = []
dep_dict = {'type': 'metatask', 'name': f'{self.cdump}atmprod'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)

resources = self.get_resource('mos_ext_stn_prep')
task = create_wf_task('mos_ext_stn_prep', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def mos_ext_grd_prep(self):
deps = []
dep_dict = {'type': 'metatask', 'name': f'{self.cdump}atmprod'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)

resources = self.get_resource('mos_ext_grd_prep')
task = create_wf_task('mos_ext_grd_prep', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def mos_stn_fcst(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_stn_prep'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)

resources = self.get_resource('mos_stn_fcst')
task = create_wf_task('mos_stn_fcst', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def mos_grd_fcst(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_stn_prep'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_grd_prep'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)

resources = self.get_resource('mos_grd_fcst')
task = create_wf_task('mos_grd_fcst', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def mos_ext_stn_fcst(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_ext_stn_prep'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_stn_prdgen'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)

resources = self.get_resource('mos_ext_stn_fcst')
task = create_wf_task('mos_ext_stn_fcst', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def mos_ext_grd_fcst(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_ext_stn_prep'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_ext_grd_prep'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_grd_fcst'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)

resources = self.get_resource('mos_ext_grd_fcst')
task = create_wf_task('mos_ext_grd_fcst', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def mos_stn_prdgen(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_stn_fcst'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)

resources = self.get_resource('mos_stn_prdgen')
task = create_wf_task('mos_stn_prdgen', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def mos_grd_prdgen(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_grd_fcst'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_stn_prdgen'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)

resources = self.get_resource('mos_grd_prdgen')
task = create_wf_task('mos_grd_prdgen', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def mos_ext_stn_prdgen(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_ext_stn_fcst'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_stn_prdgen'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)

resources = self.get_resource('mos_ext_stn_prdgen')
task = create_wf_task('mos_ext_stn_prdgen', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def mos_ext_grd_prdgen(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_ext_grd_fcst'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_grd_prdgen'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_ext_stn_prdgen'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)

resources = self.get_resource('mos_ext_grd_prdgen')
task = create_wf_task('mos_ext_grd_prdgen', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def mos_wx_prdgen(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_grd_prdgen'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)

resources = self.get_resource('mos_wx_prdgen')
task = create_wf_task('mos_wx_prdgen', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def mos_wx_ext_prdgen(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_ext_grd_prdgen'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_wx_prdgen'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)

resources = self.get_resource('mos_wx_ext_prdgen')
task = create_wf_task('mos_wx_ext_prdgen', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def arch(self):
deps = []
dependencies = []
Expand Down Expand Up @@ -1145,6 +1326,15 @@ def arch(self):
if self.app_config.mode in ['forecast-only']: # TODO: fix ocnpost to run in cycled mode
dep_dict = {'type': 'metatask', 'name': f'{self.cdump}ocnpost'}
deps.append(rocoto.add_dependency(dep_dict))
# MOS job dependencies
if self.cdump in ['gfs'] and self.app_config.do_mos:
mos_jobs = ["stn_prep", "grd_prep", "ext_stn_prep", "ext_grd_prep",
"stn_fcst", "grd_fcst", "ext_stn_fcst", "ext_grd_fcst",
"stn_prdgen", "grd_prdgen", "ext_stn_prdgen", "ext_grd_prdgen",
"wx_prdgen", "wx_ext_prdgen"]
for job in mos_jobs:
dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_{job}'}
deps.append(rocoto.add_dependency(dep_dict))

dependencies = rocoto.create_dependency(dep_condition='and', dep=deps + dependencies)

Expand Down
7 changes: 6 additions & 1 deletion workflow/rocoto/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ class Tasks:
'gempak', 'gempakmeta', 'gempakmetancdc', 'gempakncdcupapgif', 'gempakpgrb2spec', 'npoess_pgrb2_0p5deg'
'waveawipsbulls', 'waveawipsgridded', 'wavegempak', 'waveinit',
'wavepostbndpnt', 'wavepostbndpntbll', 'wavepostpnt', 'wavepostsbs', 'waveprep',
'npoess']
'npoess',
'mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep',
'mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst',
'mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen', 'mos_wx_prdgen', 'mos_wx_ext_prdgen']

def __init__(self, app_config: AppConfig, cdump: str) -> None:

Expand Down Expand Up @@ -149,6 +152,8 @@ def get_resource(self, task_name):
threads = task_config[f'nth_{task_name}_gfs']

memory = task_config.get(f'memory_{task_name}', None)
if task_config.get('prepost', False):
memory += ':prepost=true'

native = None
if scheduler in ['pbspro']:
Expand Down

0 comments on commit 52781c6

Please sign in to comment.