Skip to content

Commit

Permalink
Merge branch 'NOAA-EMC:develop' into feature/obsproc_prepobs_versions
Browse files Browse the repository at this point in the history
  • Loading branch information
KateFriedman-NOAA authored Sep 16, 2024
2 parents b999a17 + 2602eac commit 43c20dd
Show file tree
Hide file tree
Showing 16 changed files with 162 additions and 29 deletions.
48 changes: 48 additions & 0 deletions env/AZUREPW.env
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ if [[ "${step}" = "fcst" ]] || [[ "${step}" = "efcs" ]]; then
export APRUN_UFS="${launcher} -n ${ufs_ntasks}"
unset nnodes ufs_ntasks

elif [[ "${step}" = "waveinit" ]] || [[ "${step}" = "waveprep" ]] || [[ "${step}" = "wavepostsbs" ]] || [[ "${step}" = "wavepostbndpnt" ]] || [[ "${step}" = "wavepostbndpntbll" ]] || [[ "${step}" = "wavepostpnt" ]]; then

export CFP_MP="YES"
if [[ "${step}" = "waveprep" ]]; then export MP_PULSE=0 ; fi
export wavempexec=${launcher}
export wave_mpmd=${mpmd_opt}

elif [[ "${step}" = "post" ]]; then

export NTHREADS_NP=${NTHREADS1}
Expand All @@ -52,4 +59,45 @@ elif [[ "${step}" = "post" ]]; then
[[ ${NTHREADS_DWN} -gt ${max_threads_per_task} ]] && export NTHREADS_DWN=${max_threads_per_task}
export APRUN_DWN="${launcher} -n ${ntasks_dwn}"

elif [[ "${step}" = "atmos_products" ]]; then

export USE_CFP="YES" # Use MPMD for downstream product generation on Hera

elif [[ "${step}" = "oceanice_products" ]]; then

export NTHREADS_OCNICEPOST=${NTHREADS1}
export APRUN_OCNICEPOST="${launcher} -n 1 --cpus-per-task=${NTHREADS_OCNICEPOST}"

elif [[ "${step}" = "ecen" ]]; then

export NTHREADS_ECEN=${NTHREADSmax}
export APRUN_ECEN="${APRUN}"

export NTHREADS_CHGRES=${threads_per_task_chgres:-12}
[[ ${NTHREADS_CHGRES} -gt ${max_tasks_per_node} ]] && export NTHREADS_CHGRES=${max_tasks_per_node}
export APRUN_CHGRES="time"

export NTHREADS_CALCINC=${threads_per_task_calcinc:-1}
[[ ${NTHREADS_CALCINC} -gt ${max_threads_per_task} ]] && export NTHREADS_CALCINC=${max_threads_per_task}
export APRUN_CALCINC="${APRUN}"

elif [[ "${step}" = "esfc" ]]; then

export NTHREADS_ESFC=${NTHREADSmax}
export APRUN_ESFC="${APRUN}"

export NTHREADS_CYCLE=${threads_per_task_cycle:-14}
[[ ${NTHREADS_CYCLE} -gt ${max_tasks_per_node} ]] && export NTHREADS_CYCLE=${max_tasks_per_node}
export APRUN_CYCLE="${APRUN}"

elif [[ "${step}" = "epos" ]]; then

export NTHREADS_EPOS=${NTHREADSmax}
export APRUN_EPOS="${APRUN}"

elif [[ "${step}" = "fit2obs" ]]; then

export NTHREADS_FIT2OBS=${NTHREADS1}
export MPIRUN="${APRUN}"

fi
4 changes: 2 additions & 2 deletions env/WCOSS2.env
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,12 @@ elif [[ "${step}" = "postsnd" ]]; then
export OMP_NUM_THREADS=1

export NTHREADS_POSTSND=${NTHREADS1}
export mpmd_opt="-ppn 21 ${mpmd_opt}"

export NTHREADS_POSTSNDCFP=${threads_per_task_postsndcfp:-1}
[[ ${NTHREADS_POSTSNDCFP} -gt ${max_threads_per_task} ]] && export NTHREADS_POSTSNDCFP=${max_threads_per_task}
export APRUN_POSTSNDCFP="${launcher} -np ${ntasks_postsndcfp} ${mpmd_opt}"

export mpmd_opt="-ppn ${tasks_per_node} ${mpmd_opt}"

elif [[ "${step}" = "awips" ]]; then

export NTHREADS_AWIPS=${NTHREADS1}
Expand Down
7 changes: 5 additions & 2 deletions jobs/JGLOBAL_ATMOS_UPP
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ source "${HOMEgfs}/ush/jjob_header.sh" -e "upp" -c "base upp"
##############################################

# Construct COM variables from templates
YMD=${PDY} HH=${cyc} declare_from_tmpl -rx COM_ATMOS_ANALYSIS COM_ATMOS_HISTORY COM_ATMOS_MASTER
if [[ ! -d ${COM_ATMOS_MASTER} ]]; then mkdir -m 775 -p "${COM_ATMOS_MASTER}"; fi
YMD=${PDY} HH=${cyc} declare_from_tmpl -rx \
COMIN_ATMOS_ANALYSIS:COM_ATMOS_ANALYSIS_TMPL \
COMIN_ATMOS_HISTORY:COM_ATMOS_HISTORY_TMPL \
COMOUT_ATMOS_MASTER:COM_ATMOS_MASTER_TMPL
if [[ ! -d ${COMOUT_ATMOS_MASTER} ]]; then mkdir -p "${COMOUT_ATMOS_MASTER}"; fi


###############################################################
Expand Down
2 changes: 1 addition & 1 deletion parm/config/gefs/config.base
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ export DELETE_COM_IN_ARCHIVE_JOB="YES" # NO=retain ROTDIR. YES default in arc
export NUM_SND_COLLECTIVES=${NUM_SND_COLLECTIVES:-9}

# The tracker, genesis, and METplus jobs are not supported on CSPs yet
# TODO: we should place these in workflow/hosts/[csp]pw.yaml as part of AWS/AZURE/GOOGLE setup, not for general.
# TODO: we should place these in workflow/hosts/[aws|azure|google]pw.yaml as part of CSP's setup, not for general.
if [[ "${machine}" =~ "PW" ]]; then
export DO_WAVE="NO"
fi
Expand Down
1 change: 1 addition & 0 deletions parm/config/gefs/config.cleanup
12 changes: 12 additions & 0 deletions parm/config/gefs/config.resources
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ case ${machine} in
export PARTITION_BATCH="compute"
max_tasks_per_node=36
;;
"AZUREPW")
export PARTITION_BATCH="compute"
max_tasks_per_node=24
;;
"GOOGLEPW")
export PARTITION_BATCH="compute"
max_tasks_per_node=32
Expand Down Expand Up @@ -291,6 +295,14 @@ case ${step} in
export threads_per_task=1
export memory="4096M"
;;

"cleanup")
export walltime="00:15:00"
export ntasks=1
export tasks_per_node=1
export threads_per_task=1
export memory="4096M"
;;
*)
echo "FATAL ERROR: Invalid job ${step} passed to ${BASH_SOURCE[0]}"
exit 1
Expand Down
11 changes: 11 additions & 0 deletions parm/config/gefs/config.resources.AZUREPW
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#! /usr/bin/env bash

# AZURE-specific job resources

export is_exclusive="True"
unset memory

# shellcheck disable=SC2312
for mem_var in $(env | grep '^memory_' | cut -d= -f1); do
unset "${mem_var}"
done
34 changes: 32 additions & 2 deletions parm/config/gfs/config.resources
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ case ${step} in
case ${OCNRES} in
"025") ntasks=480;;
"050") ntasks=16;;
"100") ntasks=16;;
"500") ntasks=16;;
*)
echo "FATAL ERROR: Resources not defined for job ${step} at resolution ${OCNRES}"
Expand All @@ -550,6 +551,10 @@ case ${step} in
ntasks=16
memory="96GB"
;;
"100")
ntasks=16
memory="96GB"
;;
"500")
ntasks=16
memory="24GB"
Expand All @@ -576,6 +581,10 @@ case ${step} in
ntasks=16
memory="96GB"
;;
"100")
ntasks=16
memory="96GB"
;;
"500")
ntasks=16
memory="24GB"
Expand All @@ -602,6 +611,10 @@ case ${step} in
ntasks=16
memory="96GB"
;;
"100")
ntasks=16
memory="96GB"
;;
"500")
ntasks=16
memory="24GB"
Expand Down Expand Up @@ -630,6 +643,9 @@ case ${step} in
"050")
memory="32GB"
ntasks=16;;
"100")
memory="32GB"
ntasks=16;;
"500")
memory="32GB"
ntasks=8;;
Expand Down Expand Up @@ -1192,9 +1208,23 @@ case ${step} in
"postsnd")
walltime="02:00:00"
export ntasks=141
threads_per_task=6
export tasks_per_node=21
export ntasks_postsndcfp=9
case ${CASE} in
"C768")
tasks_per_node=21
threads_per_task=6
memory="23GB"
;;
"C1152")
tasks_per_node=9
threads_per_task=14
memory="50GB"
;;
*)
tasks_per_node=21
threads_per_task=6
;;
esac
export tasks_per_node_postsndcfp=1
postsnd_req_cores=$(( tasks_per_node * threads_per_task ))
if (( postsnd_req_cores > max_tasks_per_node )); then
Expand Down
1 change: 1 addition & 0 deletions parm/config/gfs/config.resources.GAEA
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ case ${step} in

esac

unset memory
# shellcheck disable=SC2312
for mem_var in $(env | grep '^memory_' | cut -d= -f1); do
unset "${mem_var}"
Expand Down
28 changes: 14 additions & 14 deletions parm/post/upp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ analysis:
data_in:
copy:
- ["{{ PARMgfs }}/post/gfs/postxconfig-NT-gfs-anl.txt", "{{ DATA }}/postxconfig-NT.txt"]
- ["{{ COM_ATMOS_ANALYSIS }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.atmanl.nc", "{{ DATA }}/{{ atmos_filename }}"]
- ["{{ COM_ATMOS_ANALYSIS }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.sfcanl.nc", "{{ DATA }}/{{ flux_filename }}"]
- ["{{ COMIN_ATMOS_ANALYSIS }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.atmanl.nc", "{{ DATA }}/{{ atmos_filename }}"]
- ["{{ COMIN_ATMOS_ANALYSIS }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.sfcanl.nc", "{{ DATA }}/{{ flux_filename }}"]
data_out:
copy:
- ["{{ DATA }}/GFSPRS.GrbF00", "{{ COM_ATMOS_MASTER }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.master.grb2anl"]
- ["{{ DATA }}/GFSPRS.GrbF00.idx", "{{ COM_ATMOS_MASTER }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.master.grb2ianl"]
- ["{{ DATA }}/GFSPRS.GrbF00", "{{ COMOUT_ATMOS_MASTER }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.master.grb2anl"]
- ["{{ DATA }}/GFSPRS.GrbF00.idx", "{{ COMOUT_ATMOS_MASTER }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.master.grb2ianl"]

forecast:
config:
Expand All @@ -36,14 +36,14 @@ forecast:
{% else %}
- ["{{ PARMgfs }}/post/gfs/postxconfig-NT-gfs-two.txt", "{{ DATA }}/postxconfig-NT.txt"]
{% endif %}
- ["{{ COM_ATMOS_HISTORY }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.atmf{{ '%03d' % forecast_hour }}.nc", "{{ DATA }}/{{ atmos_filename }}"]
- ["{{ COM_ATMOS_HISTORY }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.sfcf{{ '%03d' % forecast_hour }}.nc", "{{ DATA }}/{{ flux_filename }}"]
- ["{{ COMIN_ATMOS_HISTORY }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.atmf{{ '%03d' % forecast_hour }}.nc", "{{ DATA }}/{{ atmos_filename }}"]
- ["{{ COMIN_ATMOS_HISTORY }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.sfcf{{ '%03d' % forecast_hour }}.nc", "{{ DATA }}/{{ flux_filename }}"]
data_out:
copy:
- ["{{ DATA }}/GFSPRS.GrbF{{ '%02d' % forecast_hour }}", "{{ COM_ATMOS_MASTER }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.master.grb2f{{ '%03d' % forecast_hour }}"]
- ["{{ DATA }}/GFSFLX.GrbF{{ '%02d' % forecast_hour }}", "{{ COM_ATMOS_MASTER }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.sfluxgrbf{{ '%03d' % forecast_hour }}.grib2"]
- ["{{ DATA }}/GFSPRS.GrbF{{ '%02d' % forecast_hour }}.idx", "{{ COM_ATMOS_MASTER }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.master.grb2if{{ '%03d' % forecast_hour }}"]
- ["{{ DATA }}/GFSFLX.GrbF{{ '%02d' % forecast_hour }}.idx", "{{ COM_ATMOS_MASTER }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.sfluxgrbf{{ '%03d' % forecast_hour }}.grib2.idx"]
- ["{{ DATA }}/GFSPRS.GrbF{{ '%02d' % forecast_hour }}", "{{ COMOUT_ATMOS_MASTER }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.master.grb2f{{ '%03d' % forecast_hour }}"]
- ["{{ DATA }}/GFSFLX.GrbF{{ '%02d' % forecast_hour }}", "{{ COMOUT_ATMOS_MASTER }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.sfluxgrbf{{ '%03d' % forecast_hour }}.grib2"]
- ["{{ DATA }}/GFSPRS.GrbF{{ '%02d' % forecast_hour }}.idx", "{{ COMOUT_ATMOS_MASTER }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.master.grb2if{{ '%03d' % forecast_hour }}"]
- ["{{ DATA }}/GFSFLX.GrbF{{ '%02d' % forecast_hour }}.idx", "{{ COMOUT_ATMOS_MASTER }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.sfluxgrbf{{ '%03d' % forecast_hour }}.grib2.idx"]

goes:
config:
Expand Down Expand Up @@ -82,9 +82,9 @@ goes:
- ["{{ 'CRTM_FIX' | getenv }}/AerosolCoeff.bin", "{{ DATA }}/"]
- ["{{ 'CRTM_FIX' | getenv }}/CloudCoeff.bin", "{{ DATA }}/"]
- ["{{ PARMgfs }}/post/gfs/postxconfig-NT-gfs-goes.txt", "{{ DATA }}/postxconfig-NT.txt"]
- ["{{ COM_ATMOS_HISTORY }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.atmf{{ '%03d' % forecast_hour }}.nc", "{{ DATA }}/{{ atmos_filename }}"]
- ["{{ COM_ATMOS_HISTORY }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.sfcf{{ '%03d' % forecast_hour }}.nc", "{{ DATA }}/{{ flux_filename }}"]
- ["{{ COMIN_ATMOS_HISTORY }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.atmf{{ '%03d' % forecast_hour }}.nc", "{{ DATA }}/{{ atmos_filename }}"]
- ["{{ COMIN_ATMOS_HISTORY }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.sfcf{{ '%03d' % forecast_hour }}.nc", "{{ DATA }}/{{ flux_filename }}"]
data_out:
copy:
- ["{{ DATA }}/GFSGOES.GrbF{{ '%02d' % forecast_hour }}", "{{ COM_ATMOS_MASTER }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.special.grb2f{{ '%03d' % forecast_hour }}"]
- ["{{ DATA }}/GFSGOES.GrbF{{ '%02d' % forecast_hour }}.idx", "{{ COM_ATMOS_MASTER }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.special.grb2if{{ '%03d' % forecast_hour }}"]
- ["{{ DATA }}/GFSGOES.GrbF{{ '%02d' % forecast_hour }}", "{{ COMOUT_ATMOS_MASTER }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.special.grb2f{{ '%03d' % forecast_hour }}"]
- ["{{ DATA }}/GFSGOES.GrbF{{ '%02d' % forecast_hour }}.idx", "{{ COMOUT_ATMOS_MASTER }}/{{ RUN }}.t{{ current_cycle | strftime('%H') }}z.special.grb2if{{ '%03d' % forecast_hour }}"]
2 changes: 1 addition & 1 deletion scripts/exglobal_atmos_upp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def main():

# Pull out all the configuration keys needed to run the rest of UPP steps
keys = ['HOMEgfs', 'DATA', 'current_cycle', 'RUN', 'NET',
'COM_ATMOS_ANALYSIS', 'COM_ATMOS_HISTORY', 'COM_ATMOS_MASTER',
'COMIN_ATMOS_ANALYSIS', 'COMIN_ATMOS_HISTORY', 'COMOUT_ATMOS_MASTER',
'upp_run',
'APRUN_UPP',
'forecast_hour', 'valid_datetime',
Expand Down
2 changes: 1 addition & 1 deletion sorc/build_all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ build_opts["ww3prepost"]="${_wave_opt} ${_verbose_opt} ${_build_ufs_opt} ${_buil

# Optional DA builds
if [[ "${_build_ufsda}" == "YES" ]]; then
if [[ "${MACHINE_ID}" != "orion" && "${MACHINE_ID}" != "hera" && "${MACHINE_ID}" != "hercules" && "${MACHINE_ID}" != "wcoss2" && "${MACHINE_ID}" != "noaacloud" ]]; then
if [[ "${MACHINE_ID}" != "orion" && "${MACHINE_ID}" != "hera" && "${MACHINE_ID}" != "hercules" && "${MACHINE_ID}" != "wcoss2" && "${MACHINE_ID}" != "noaacloud" && "${MACHINE_ID}" != "gaea" ]]; then
echo "NOTE: The GDAS App is not supported on ${MACHINE_ID}. Disabling build."
else
build_jobs["gdas"]=8
Expand Down
6 changes: 3 additions & 3 deletions ush/python/pygfs/task/upp.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def _call_executable(exec_cmd: Executable) -> None:
@logit(logger)
def finalize(upp_run: Dict, upp_yaml: Dict) -> None:
"""Perform closing actions of the task.
Copy data back from the DATA/ directory to COM/
Copy data back from the DATA/ directory to COMOUT/
Parameters
----------
Expand All @@ -259,6 +259,6 @@ def finalize(upp_run: Dict, upp_yaml: Dict) -> None:
Fully resolved upp.yaml dictionary
"""

# Copy "upp_run" specific generated data to COM/ directory
logger.info(f"Copy '{upp_run}' processed data to COM/ directory")
# Copy "upp_run" specific generated data to COMOUT/ directory
logger.info(f"Copy '{upp_run}' processed data to COMOUT/ directory")
FileHandler(upp_yaml[upp_run].data_out).sync()
4 changes: 2 additions & 2 deletions workflow/applications/gefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def _get_app_configs(self):
"""
Returns the config_files that are involved in gefs
"""
configs = ['stage_ic', 'fcst', 'atmos_products', 'arch']
configs = ['stage_ic', 'fcst', 'atmos_products', 'arch', 'cleanup']

if self.nens > 0:
configs += ['efcs', 'atmos_ensstat']
Expand Down Expand Up @@ -82,6 +82,6 @@ def get_task_names(self):
if self.do_extractvars:
tasks += ['extractvars']

tasks += ['arch']
tasks += ['arch', 'cleanup']

return {f"{self.run}": tasks}
2 changes: 1 addition & 1 deletion workflow/hosts/azurepw.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ CHGRP_RSTPROD: 'YES'
CHGRP_CMD: 'chgrp rstprod' # TODO: This is not yet supported.
HPSSARCH: 'NO'
HPSS_PROJECT: emc-global #TODO: See `ATARDIR` below.
BASE_CPLIC: '/bucket/global-workflow-shared-data/ICSDIR/prototype_ICs'
BASE_IC: '/bucket/global-workflow-shared-data/ICSDIR'
LOCALARCH: 'NO'
ATARDIR: '' # TODO: This will not yet work from AZURE.
MAKE_NSSTBUFR: 'NO'
Expand Down
27 changes: 27 additions & 0 deletions workflow/rocoto/gefs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,9 @@ def arch(self):
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'metatask', 'name': 'wave_post_bndpnt_bull'}
deps.append(rocoto.add_dependency(dep_dict))
if self.app_config.do_extractvars:
dep_dict = {'type': 'metatask', 'name': 'extractvars'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps, dep_condition='and')

resources = self.get_resource('arch')
Expand All @@ -564,3 +567,27 @@ def arch(self):
task = rocoto.create_task(task_dict)

return task

def cleanup(self):
deps = []
dep_dict = {'type': 'task', 'name': 'arch'}
deps.append(rocoto.add_dependency(dep_dict))

dependencies = rocoto.create_dependency(dep=deps)

resources = self.get_resource('cleanup')
task_name = 'cleanup'
task_dict = {'task_name': task_name,
'resources': resources,
'envars': self.envars,
'cycledef': 'gefs',
'dependency': dependencies,
'command': f'{self.HOMEgfs}/jobs/rocoto/cleanup.sh',
'job_name': f'{self.pslot}_{task_name}_@H',
'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log',
'maxtries': '&MAXTRIES;'
}

task = rocoto.create_task(task_dict)

return task

0 comments on commit 43c20dd

Please sign in to comment.