From 6f5fa7949299ef70b5e846697461b8538aef6a5d Mon Sep 17 00:00:00 2001 From: Walter Kolczynski - NOAA Date: Wed, 19 Oct 2022 23:37:45 -0400 Subject: [PATCH] Use more cycledefs for task control (#1078) Splits the existing rocoto cycle definitions up to offer better job control. This means that only the jobs that are due to run will appear in a cycle's job list from rocotostat/rocotoviewer. It also allows for the removal of some of the cycleexist dependencies that were there solely to prevent the job from running in the half cycle. A side effect of this change is that the half-cycle will be recognized as a completed cycle, fixing the bug with archive jobs starting in the fourth cycle (#1003). The gdas cycledef has been split into a `gdas_half` for the first half- cycle and `gdas` for the other GDAS cycles. Tasks that run during that first half-cycle therefore run on two cycledefs. For gfs, instead of slicing perpindicular to time, a new cycledef `gfs_cont` (continuity) was created in parallel to the existing gfs cycledef that omits the first cycle. This was done since only one job (`aerosol_init`) currently skips the first cycle, and this prevents the need to provide two cycledefs for every gfs task but one. Since some time math is now being done on sdate in workflow_xml.py, we now keep those as datetime objects and only convert to string when writing the cycledef strings. In order to access the pygw utilities in the workflow directory, a symlink is created in `workflow` pointing to the pygw location in `ush`. A better solution may be found in the future. Fixes #1003 --- workflow/pygw | 1 + workflow/rocoto/workflow_tasks.py | 52 ++++++++++++++++++------------- workflow/rocoto/workflow_xml.py | 39 +++++++++++++++-------- 3 files changed, 57 insertions(+), 35 deletions(-) create mode 120000 workflow/pygw diff --git a/workflow/pygw b/workflow/pygw new file mode 120000 index 0000000000..dfa1d9a164 --- /dev/null +++ b/workflow/pygw @@ -0,0 +1 @@ +../ush/python/pygw/src/pygw \ No newline at end of file diff --git a/workflow/rocoto/workflow_tasks.py b/workflow/rocoto/workflow_tasks.py index 5ec1dbb39c..edb35af513 100644 --- a/workflow/rocoto/workflow_tasks.py +++ b/workflow/rocoto/workflow_tasks.py @@ -310,10 +310,6 @@ def aerosol_init(self): interval = self._base['INTERVAL'] offset = f'-{interval}' - # Previous cycle - dep_dict = {'type': 'cycleexist', 'offset': offset} - deps.append(rocoto.add_dependency(dep_dict)) - # Files from previous cycle files = [f'@Y@m@d.@H0000.fv_core.res.nc'] + \ [f'@Y@m@d.@H0000.fv_core.res.tile{tile}.nc' for tile in range(1, self.n_tiles + 1)] + \ @@ -326,8 +322,10 @@ def aerosol_init(self): dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + cycledef = 'gfs_seq' resources = self.get_resource('aerosol_init') - task = create_wf_task('aerosol_init', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + task = create_wf_task('aerosol_init', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies, + cycledef=cycledef) return task @@ -387,8 +385,6 @@ def analdiag(self): deps = [] dep_dict = {'type': 'task', 'name': f'{self.cdump}anal'} deps.append(rocoto.add_dependency(dep_dict)) - dep_dict = {'type': 'cycleexist', 'offset': '-06:00:00'} - deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) resources = self.get_resource('analdiag') @@ -446,8 +442,6 @@ def atmanalpost(self): deps = [] dep_dict = {'type': 'task', 'name': f'{self.cdump}atmanalrun'} deps.append(rocoto.add_dependency(dep_dict)) - dep_dict = {'type': 'cycleexist', 'offset': '-06:00:00'} - deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) resources = self.get_resource('atmanalpost') @@ -459,7 +453,6 @@ def aeroanlinit(self): suffix = self._base["SUFFIX"] dump_suffix = self._base["DUMP_SUFFIX"] - gfs_cyc = self._base["gfs_cyc"] dmpdir = self._base["DMPDIR"] deps = [] @@ -494,8 +487,6 @@ def aeroanlfinal(self): deps = [] dep_dict = {'type': 'task', 'name': f'{self.cdump}aeroanlrun'} deps.append(rocoto.add_dependency(dep_dict)) - dep_dict = {'type': 'cycleexist', 'offset': '-06:00:00'} - deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) resources = self.get_resource('aeroanlfinal') @@ -508,8 +499,6 @@ def gldas(self): deps = [] dep_dict = {'type': 'task', 'name': f'{self.cdump}sfcanl'} deps.append(rocoto.add_dependency(dep_dict)) - dep_dict = {'type': 'cycleexist', 'offset': '-06:00:00'} - deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) resources = self.get_resource('gldas') @@ -604,8 +593,11 @@ def _fcst_cycled(self): dependencies.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='or', dep=dependencies) + cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump + resources = self.get_resource('fcst') - task = create_wf_task('fcst', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + task = create_wf_task('fcst', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies, + cycledef=cycledef) return task @@ -679,9 +671,11 @@ def _get_postgroups(cdump, config, add_anl=False): varval1, varval2, varval3 = _get_postgroups(self.cdump, self._configs[task_name], add_anl=add_anl_to_post) vardict = {varname2: varval2, varname3: varval3} + cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump + resources = self.get_resource(task_name) task = create_wf_task(task_name, resources, cdump=self.cdump, envar=postenvars, dependency=dependencies, - metatask=task_name, varname=varname1, varval=varval1, vardict=vardict) + metatask=task_name, varname=varname1, varval=varval1, vardict=vardict, cycledef=cycledef) return task @@ -913,8 +907,11 @@ def vrfy(self): deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) + cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump + resources = self.get_resource('vrfy') - task = create_wf_task('vrfy', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + task = create_wf_task('vrfy', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies, + cycledef=cycledef) return task @@ -961,8 +958,11 @@ def arch(self): deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump + resources = self.get_resource('arch') - task = create_wf_task('arch', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + task = create_wf_task('arch', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies, + cycledef=cycledef) return task @@ -1171,9 +1171,10 @@ def efcs(self): groups = self._get_hybgroups(self._base['NMEM_ENKF'], self._configs['efcs']['NMEM_EFCSGRP']) + cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump resources = self.get_resource('efcs') task = create_wf_task('efcs', resources, cdump=self.cdump, envar=efcsenvars, dependency=dependencies, - metatask='efmn', varname='grp', varval=groups) + metatask='efmn', varname='grp', varval=groups, cycledef=cycledef) return task @@ -1188,8 +1189,11 @@ def echgres(self): deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump + resources = self.get_resource('echgres') - task = create_wf_task('echgres', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + task = create_wf_task('echgres', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies, + cycledef=cycledef) return task @@ -1231,9 +1235,11 @@ def _get_eposgroups(epos): varval1, varval2, varval3 = _get_eposgroups(self._configs['epos']) vardict = {varname2: varval2, varname3: varval3} + cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump + resources = self.get_resource('epos') task = create_wf_task('epos', resources, cdump=self.cdump, envar=eposenvars, dependency=dependencies, - metatask='epmn', varname=varname1, varval=varval1, vardict=vardict) + metatask='epmn', varname=varname1, varval=varval1, vardict=vardict, cycledef=cycledef) return task @@ -1251,9 +1257,11 @@ def earc(self): groups = self._get_hybgroups(self._base['NMEM_ENKF'], self._configs['earc']['NMEM_EARCGRP'], start_index=0) + cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump + resources = self.get_resource('earc') task = create_wf_task('earc', resources, cdump=self.cdump, envar=earcenvars, dependency=dependencies, - metatask='eamn', varname='grp', varval=groups) + metatask='eamn', varname='grp', varval=groups, cycledef=cycledef) return task diff --git a/workflow/rocoto/workflow_xml.py b/workflow/rocoto/workflow_xml.py index 440ff93db5..52ff86db2c 100644 --- a/workflow/rocoto/workflow_xml.py +++ b/workflow/rocoto/workflow_xml.py @@ -3,6 +3,7 @@ import os from distutils.spawn import find_executable from datetime import datetime +from pygw.timetools import to_timedelta from collections import OrderedDict from applications import AppConfig from rocoto.workflow_tasks import get_wf_tasks @@ -110,29 +111,41 @@ def _get_cycledefs(self): return cycledefs def _get_cycledefs_cycled(self): - sdate = self._base['SDATE'].strftime('%Y%m%d%H%M') - edate = self._base['EDATE'].strftime('%Y%m%d%H%M') + sdate = self._base['SDATE'] + edate = self._base['EDATE'] interval = self._base.get('INTERVAL', '06:00:00') - strings = [f'\t{sdate} {edate} {interval}\n'] + strings = [] + strings.append(f'\t{sdate.strftime("%Y%m%d%H%M")} {sdate.strftime("%Y%m%d%H%M")} {interval}') + sdate = sdate + to_timedelta(interval) + strings.append(f'\t{sdate.strftime("%Y%m%d%H%M")} {edate.strftime("%Y%m%d%H%M")} {interval}') if self._app_config.gfs_cyc != 0: - sdate_gfs = self._base['SDATE_GFS'].strftime('%Y%m%d%H%M') - edate_gfs = self._base['EDATE_GFS'].strftime('%Y%m%d%H%M') + sdate_gfs = self._base['SDATE_GFS'] + edate_gfs = self._base['EDATE_GFS'] interval_gfs = self._base['INTERVAL_GFS'] - strings.append(f'\t{sdate_gfs} {edate_gfs} {interval_gfs}') - strings.append('') - strings.append('') + strings.append(f'\t{sdate_gfs.strftime("%Y%m%d%H%M")} {edate_gfs.strftime("%Y%m%d%H%M")} {interval_gfs}') + + sdate_gfs = sdate_gfs + to_timedelta(interval_gfs) + if sdate_gfs <= edate_gfs: + strings.append(f'\t{sdate_gfs.strftime("%Y%m%d%H%M")} {edate_gfs.strftime("%Y%m%d%H%M")} {interval_gfs}') + + strings.append('') + strings.append('') return '\n'.join(strings) def _get_cycledefs_forecast_only(self): - sdate = self._base['SDATE'].strftime('%Y%m%d%H%M') - edate = self._base['EDATE'].strftime('%Y%m%d%H%M') + sdate = self._base['SDATE'] + edate = self._base['EDATE'] interval = self._base.get('INTERVAL_GFS', '24:00:00') - cdump = self._base['CDUMP'] - strings = f'\t{sdate} {edate} {interval}\n\n' + strings = [] + strings.append(f'\t{sdate.strftime("%Y%m%d%H%M")} {edate.strftime("%Y%m%d%H%M")} {interval}') + + sdate = sdate + to_timedelta(interval) + if sdate <= edate: + strings.append(f'\t{sdate.strftime("%Y%m%d%H%M")} {edate.strftime("%Y%m%d%H%M")} {interval}') - return strings + return '\n'.join(strings) @staticmethod def _get_workflow_footer():