From 52781c65210911096fb8e56d079fab0122e876c0 Mon Sep 17 00:00:00 2001 From: "kate.friedman" Date: Tue, 12 Dec 2023 18:42:15 +0000 Subject: [PATCH] Add MOS jobs and new prepost flag to rocoto setup - Add MOS jobs to rocoto workflow setup - Add MOS jobs to archive dependencies using for loop - Add new prepost flag to setup for use on WCOSS2; when "prepost=true" is set for a job in config.resources an additional flag is included at the end of the PBS select line via inclusion in the tag in the rocoto xml Refs #2068 --- workflow/applications/applications.py | 1 + workflow/applications/gfs_cycled.py | 12 ++ workflow/applications/gfs_forecast_only.py | 12 ++ workflow/rocoto/gfs_tasks.py | 190 +++++++++++++++++++++ workflow/rocoto/tasks.py | 7 +- 5 files changed, 221 insertions(+), 1 deletion(-) diff --git a/workflow/applications/applications.py b/workflow/applications/applications.py index 441dbe4c19..766d4aa508 100644 --- a/workflow/applications/applications.py +++ b/workflow/applications/applications.py @@ -62,6 +62,7 @@ def __init__(self, conf: Configuration) -> None: self.do_genesis_fsu = _base.get('DO_GENESIS_FSU', False) self.do_metp = _base.get('DO_METP', False) self.do_upp = not _base.get('WRITE_DOPOST', True) + self.do_mos = _base.get('DO_MOS', False) self.do_hpssarch = _base.get('HPSSARCH', False) diff --git a/workflow/applications/gfs_cycled.py b/workflow/applications/gfs_cycled.py index 63332c0cf6..6f70ca85b2 100644 --- a/workflow/applications/gfs_cycled.py +++ b/workflow/applications/gfs_cycled.py @@ -104,6 +104,12 @@ def _get_app_configs(self): if self.do_jedilandda: configs += ['preplandobs', 'landanl'] + if self.do_mos: + configs += ['mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep'] + configs += ['mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst'] + configs += ['mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen'] + configs += ['mos_wx_prdgen', 'mos_wx_ext_prdgen'] + return configs @staticmethod @@ -238,6 +244,12 @@ def get_task_names(self): if self.do_awips: gfs_tasks += ['awips_20km_1p0deg', 'awips_g2', 'fbwinds'] + if self.do_mos: + gfs_tasks += ['mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep'] + gfs_tasks += ['mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst'] + gfs_tasks += ['mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen'] + gfs_tasks += ['mos_wx_prdgen', 'mos_wx_ext_prdgen'] + gfs_tasks += gdas_gfs_common_cleanup_tasks tasks = dict() diff --git a/workflow/applications/gfs_forecast_only.py b/workflow/applications/gfs_forecast_only.py index 701ec12a04..7327a2d11b 100644 --- a/workflow/applications/gfs_forecast_only.py +++ b/workflow/applications/gfs_forecast_only.py @@ -60,6 +60,12 @@ def _get_app_configs(self): if self.do_awips: configs += ['waveawipsbulls', 'waveawipsgridded'] + if self.do_mos: + configs += ['mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep'] + configs += ['mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst'] + configs += ['mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen'] + configs += ['mos_wx_prdgen', 'mos_wx_ext_prdgen'] + return configs @staticmethod @@ -129,6 +135,12 @@ def get_task_names(self): if self.do_awips: tasks += ['waveawipsbulls', 'waveawipsgridded'] + if self.do_mos: + tasks += ['mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep'] + tasks += ['mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst'] + tasks += ['mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen'] + tasks += ['mos_wx_prdgen', 'mos_wx_ext_prdgen'] + tasks += ['arch', 'cleanup'] # arch and cleanup **must** be the last tasks return {f"{self._base['CDUMP']}": tasks} diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index 047c174cdb..d4ecf8e4d5 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -1091,6 +1091,187 @@ def metp(self): return task + def mos_stn_prep(self): + deps = [] + dep_dict = {'type': 'metatask', 'name': f'{self.cdump}atmprod'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + + resources = self.get_resource('mos_stn_prep') + task = create_wf_task('mos_stn_prep', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + + def mos_grd_prep(self): + deps = [] + dep_dict = {'type': 'metatask', 'name': f'{self.cdump}atmprod'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + + resources = self.get_resource('mos_grd_prep') + task = create_wf_task('mos_grd_prep', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + + def mos_ext_stn_prep(self): + deps = [] + dep_dict = {'type': 'metatask', 'name': f'{self.cdump}atmprod'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + + resources = self.get_resource('mos_ext_stn_prep') + task = create_wf_task('mos_ext_stn_prep', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + + def mos_ext_grd_prep(self): + deps = [] + dep_dict = {'type': 'metatask', 'name': f'{self.cdump}atmprod'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + + resources = self.get_resource('mos_ext_grd_prep') + task = create_wf_task('mos_ext_grd_prep', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + + def mos_stn_fcst(self): + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_stn_prep'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + + resources = self.get_resource('mos_stn_fcst') + task = create_wf_task('mos_stn_fcst', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + + def mos_grd_fcst(self): + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_stn_prep'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_grd_prep'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + resources = self.get_resource('mos_grd_fcst') + task = create_wf_task('mos_grd_fcst', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + + def mos_ext_stn_fcst(self): + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_ext_stn_prep'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_stn_prdgen'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + resources = self.get_resource('mos_ext_stn_fcst') + task = create_wf_task('mos_ext_stn_fcst', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + + def mos_ext_grd_fcst(self): + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_ext_stn_prep'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_ext_grd_prep'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_grd_fcst'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + resources = self.get_resource('mos_ext_grd_fcst') + task = create_wf_task('mos_ext_grd_fcst', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + + def mos_stn_prdgen(self): + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_stn_fcst'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + + resources = self.get_resource('mos_stn_prdgen') + task = create_wf_task('mos_stn_prdgen', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + + def mos_grd_prdgen(self): + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_grd_fcst'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_stn_prdgen'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + resources = self.get_resource('mos_grd_prdgen') + task = create_wf_task('mos_grd_prdgen', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + + def mos_ext_stn_prdgen(self): + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_ext_stn_fcst'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_stn_prdgen'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + resources = self.get_resource('mos_ext_stn_prdgen') + task = create_wf_task('mos_ext_stn_prdgen', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + + def mos_ext_grd_prdgen(self): + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_ext_grd_fcst'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_grd_prdgen'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_ext_stn_prdgen'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + resources = self.get_resource('mos_ext_grd_prdgen') + task = create_wf_task('mos_ext_grd_prdgen', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + + def mos_wx_prdgen(self): + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_grd_prdgen'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + + resources = self.get_resource('mos_wx_prdgen') + task = create_wf_task('mos_wx_prdgen', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + + def mos_wx_ext_prdgen(self): + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_ext_grd_prdgen'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_wx_prdgen'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + resources = self.get_resource('mos_wx_ext_prdgen') + task = create_wf_task('mos_wx_ext_prdgen', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + def arch(self): deps = [] dependencies = [] @@ -1145,6 +1326,15 @@ def arch(self): if self.app_config.mode in ['forecast-only']: # TODO: fix ocnpost to run in cycled mode dep_dict = {'type': 'metatask', 'name': f'{self.cdump}ocnpost'} deps.append(rocoto.add_dependency(dep_dict)) + # MOS job dependencies + if self.cdump in ['gfs'] and self.app_config.do_mos: + mos_jobs = ["stn_prep", "grd_prep", "ext_stn_prep", "ext_grd_prep", + "stn_fcst", "grd_fcst", "ext_stn_fcst", "ext_grd_fcst", + "stn_prdgen", "grd_prdgen", "ext_stn_prdgen", "ext_grd_prdgen", + "wx_prdgen", "wx_ext_prdgen"] + for job in mos_jobs: + dep_dict = {'type': 'task', 'name': f'{self.cdump}mos_{job}'} + deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps + dependencies) diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index 06000338be..8381376777 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -29,7 +29,10 @@ class Tasks: 'gempak', 'gempakmeta', 'gempakmetancdc', 'gempakncdcupapgif', 'gempakpgrb2spec', 'npoess_pgrb2_0p5deg' 'waveawipsbulls', 'waveawipsgridded', 'wavegempak', 'waveinit', 'wavepostbndpnt', 'wavepostbndpntbll', 'wavepostpnt', 'wavepostsbs', 'waveprep', - 'npoess'] + 'npoess', + 'mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep', + 'mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst', + 'mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen', 'mos_wx_prdgen', 'mos_wx_ext_prdgen'] def __init__(self, app_config: AppConfig, cdump: str) -> None: @@ -149,6 +152,8 @@ def get_resource(self, task_name): threads = task_config[f'nth_{task_name}_gfs'] memory = task_config.get(f'memory_{task_name}', None) + if task_config.get('prepost', False): + memory += ':prepost=true' native = None if scheduler in ['pbspro']: