diff --git a/esmvaltool/diag_scripts/hydrology/compute_chunks.py b/esmvaltool/diag_scripts/hydrology/compute_chunks.py new file mode 100644 index 0000000000..56b3f7b43d --- /dev/null +++ b/esmvaltool/diag_scripts/hydrology/compute_chunks.py @@ -0,0 +1,40 @@ +"""Re-chunk the time dimension, to be used by the regrid processor. + +For large cubes, regridding to a high resolution grid increases the size +of the data. To reduce memory use, we re-chunk the time dimension. + +Related iris issue: +https://github.com/SciTools/iris/issues/3808 +""" +import numpy as np + + +def compute_chunks(src, tgt): + """Compute the chunk sizes needed to regrid src to tgt.""" + block_bytes = 50 * (1 << 20) # 50 MB block size + + if src.dtype == np.float32: + dtype_bytes = 4 # size of float32 in bytes + else: + dtype_bytes = 8 # size of float64 in bytes + + ntime = src.coord('time').shape[0] + tgt_nlat = tgt.coord('latitude').shape[0] + tgt_nlon = tgt.coord('longitude').shape[0] + + # Define blocks along the time dimension + min_nblocks = int(ntime * tgt_nlat * tgt_nlon * dtype_bytes / block_bytes) + min_nblocks = max(min_nblocks, 1) + timefull = ntime // min_nblocks + timepart = ntime % timefull + + nfullblocks = ntime // timefull + npartblocks = int(timepart > 0) + + time_chunks = (timefull, ) * nfullblocks + (timepart, ) * npartblocks + src_chunks = ( + time_chunks, + (src.coord('latitude').shape[0], ), + (src.coord('longitude').shape[0], ), + ) + return src_chunks diff --git a/esmvaltool/diag_scripts/hydrology/lazy_regrid.py b/esmvaltool/diag_scripts/hydrology/lazy_regrid.py deleted file mode 100644 index e62f5bf6c1..0000000000 --- a/esmvaltool/diag_scripts/hydrology/lazy_regrid.py +++ /dev/null @@ -1,103 +0,0 @@ -"""Lazy regridding, because this is not supported by iris (yet). - -Iris issue requesting the feature: -https://github.com/SciTools/iris/issues/3700 -""" -import copy - -import iris -import numpy as np - -HORIZONTAL_SCHEMES = { - 'linear': iris.analysis.Linear(extrapolation_mode='mask'), - 'linear_extrapolate': - iris.analysis.Linear(extrapolation_mode='extrapolate'), - 'nearest': iris.analysis.Nearest(extrapolation_mode='mask'), - 'area_weighted': iris.analysis.AreaWeighted(), -} -"""Supported horizontal regridding schemes.""" - - -def _compute_chunks(src, tgt): - """Compute the chunk sizes needed to regrid src to tgt.""" - block_bytes = 50 * (1 << 20) # 50 MB block size - - if src.dtype == np.float32: - dtype_bytes = 4 # size of float32 in bytes - else: - dtype_bytes = 8 # size of float64 in bytes - - ntime = src.coord('time').shape[0] - tgt_nlat = tgt.coord('latitude').shape[0] - tgt_nlon = tgt.coord('longitude').shape[0] - - # Define blocks along the time dimension - min_nblocks = int(ntime * tgt_nlat * tgt_nlon * dtype_bytes / block_bytes) - min_nblocks = max(min_nblocks, 1) - timefull = ntime // min_nblocks - timepart = ntime % timefull - - nfullblocks = ntime // timefull - npartblocks = int(timepart > 0) - - time_chunks = (timefull, ) * nfullblocks + (timepart, ) * npartblocks - src_chunks = ( - time_chunks, - (src.coord('latitude').shape[0], ), - (src.coord('longitude').shape[0], ), - ) - tgt_chunks = ( - time_chunks, - (tgt_nlat, ), - (tgt_nlon, ), - ) - - return src_chunks, tgt_chunks - - -def _regrid_data(src, tgt, scheme): - """Regrid data from cube src onto grid of cube tgt.""" - src_chunks, tgt_chunks = _compute_chunks(src, tgt) - - # Define the block regrid function - if scheme not in HORIZONTAL_SCHEMES: - raise ValueError(f"Regridding scheme {scheme} not supported, " - f"choose from {HORIZONTAL_SCHEMES.keys()}.") - regridder = HORIZONTAL_SCHEMES[scheme].regridder(src, tgt) - - def regrid(block): - tlen = block.shape[0] - cube = src[:tlen].copy(block) - return regridder(cube).core_data() - - # Regrid - data = src.core_data().rechunk(src_chunks).map_blocks( - regrid, - dtype=src.dtype, - chunks=tgt_chunks, - ) - - return data - - -def lazy_regrid(src, tgt, scheme): - """Regrid cube src onto the grid of cube tgt.""" - data = _regrid_data(src, tgt, scheme) - - result = iris.cube.Cube(data) - result.metadata = copy.deepcopy(src.metadata) - - def copy_coords(src_coords, add_method): - for coord in src_coords: - dims = src.coord_dims(coord) - if coord == src.coord('longitude'): - coord = tgt.coord('longitude') - elif coord == src.coord('latitude'): - coord = tgt.coord('latitude') - result_coord = coord.copy() - add_method(result_coord, dims) - - copy_coords(src.dim_coords, result.add_dim_coord) - copy_coords(src.aux_coords, result.add_aux_coord) - - return result diff --git a/esmvaltool/diag_scripts/hydrology/wflow.py b/esmvaltool/diag_scripts/hydrology/wflow.py index 58a7ddfd94..d3de40151d 100644 --- a/esmvaltool/diag_scripts/hydrology/wflow.py +++ b/esmvaltool/diag_scripts/hydrology/wflow.py @@ -2,12 +2,13 @@ import logging from pathlib import Path -import iris import numpy as np from osgeo import gdal +import iris +from esmvalcore.preprocessor import regrid from esmvaltool.diag_scripts.hydrology.derive_evspsblpot import debruin_pet -from esmvaltool.diag_scripts.hydrology.lazy_regrid import lazy_regrid +from esmvaltool.diag_scripts.hydrology.compute_chunks import compute_chunks from esmvaltool.diag_scripts.shared import (ProvenanceLogger, get_diagnostic_filename, group_metadata, run_diagnostic) @@ -39,6 +40,13 @@ def create_provenance_record(): return record +def rechunk_and_regrid(src, tgt, scheme): + """Rechunk cube src and regrid it onto the grid of cube tgt.""" + src_chunks = compute_chunks(src, tgt) + src.data = src.lazy_data().rechunk(src_chunks) + return regrid(src, tgt, scheme) + + def get_input_cubes(metadata): """Create a dict with all (preprocessed) input files.""" provenance = create_provenance_record() @@ -97,7 +105,7 @@ def regrid_temperature(src_temp, src_height, target_height, scheme): src_slt = src_temp.copy(data=src_temp.core_data() + src_dtemp.core_data()) # Interpolate sea-level temperature to target grid - target_slt = lazy_regrid(src_slt, target_height, scheme) + target_slt = rechunk_and_regrid(src_slt, target_height, scheme) # Convert sea-level temperature to new target elevation target_dtemp = lapse_rate_correction(target_height) @@ -216,7 +224,7 @@ def main(cfg): logger.info("Processing variable precipitation_flux") scheme = cfg['regrid'] - pr_dem = lazy_regrid(all_vars['pr'], dem, scheme) + pr_dem = rechunk_and_regrid(all_vars['pr'], dem, scheme) logger.info("Processing variable temperature") tas_dem = regrid_temperature( @@ -229,12 +237,12 @@ def main(cfg): logger.info("Processing variable potential evapotranspiration") if 'evspsblpot' in all_vars: pet = all_vars['evspsblpot'] - pet_dem = lazy_regrid(pet, dem, scheme) + pet_dem = rechunk_and_regrid(pet, dem, scheme) else: logger.info("Potential evapotransporation not available, deriving") - psl_dem = lazy_regrid(all_vars['psl'], dem, scheme) - rsds_dem = lazy_regrid(all_vars['rsds'], dem, scheme) - rsdt_dem = lazy_regrid(all_vars['rsdt'], dem, scheme) + psl_dem = rechunk_and_regrid(all_vars['psl'], dem, scheme) + rsds_dem = rechunk_and_regrid(all_vars['rsds'], dem, scheme) + rsdt_dem = rechunk_and_regrid(all_vars['rsdt'], dem, scheme) pet_dem = debruin_pet( tas=tas_dem, psl=psl_dem, diff --git a/esmvaltool/recipes/hydrology/recipe_wflow.yml b/esmvaltool/recipes/hydrology/recipe_wflow.yml index f568901a67..dc9bb148a6 100644 --- a/esmvaltool/recipes/hydrology/recipe_wflow.yml +++ b/esmvaltool/recipes/hydrology/recipe_wflow.yml @@ -4,17 +4,17 @@ documentation: description: | Pre-processes climate data for the WFlow hydrological model. - + authors: - kalverla_peter - camphuijsen_jaro - alidoost_sarah - aerts_jerom - andela_bouwe - + projects: - ewatercycle - + references: - acknow_project @@ -40,7 +40,7 @@ diagnostics: mip: day preprocessor: rough_cutout start_year: 1990 - end_year: 1990 + end_year: 2001 pr: *daily_var # evspsblpot: # doesn't exist for ERA-Interim. # Reconstruct evspsblpot using: @@ -53,5 +53,5 @@ diagnostics: script: script: hydrology/wflow.py basin: Meuse - dem_file: 'wflow/wflow_dem_Meuse.nc' + dem_file: 'wflow_parameterset/meuse/staticmaps/wflow_dem.map' regrid: area_weighted