From 998ce6cf0cc3679e9b7c98560b8cca66aa05de2d Mon Sep 17 00:00:00 2001 From: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> Date: Thu, 30 Jan 2025 20:41:13 +0000 Subject: [PATCH] FLOPROS flood protection as worked example of support for NetCDF-style coordinates (#135) * Add onboarding of FLOPROS Signed-off-by: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> * Linting changes Signed-off-by: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> * Tidy and document Signed-off-by: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> * Make NetCDF coordinate writing consistent with current standard; align treatment of index dimension. Signed-off-by: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> * Documentation and add new field Signed-off-by: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> * Regenerate inventory; lint Signed-off-by: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> * Fix missing option Signed-off-by: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> * Update src/hazard/sources/osc_zarr.py Co-authored-by: Jonas Signed-off-by: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> * Update src/hazard/sources/osc_zarr.py Co-authored-by: Jonas Signed-off-by: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> * Update src/hazard/onboard/flopros_flood.py Co-authored-by: Jonas Signed-off-by: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> * Chore: pre-commit autoupdate * Update src/hazard/utilities/tiles.py Co-authored-by: Jonas Signed-off-by: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> * Avoid slow write Signed-off-by: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> * Rename HazardResource field Signed-off-by: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> * Chore: pre-commit autoupdate --------- Signed-off-by: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> Co-authored-by: Jonas Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- src/hazard/inventory.py | 6 + src/hazard/models/days_tas_above.py | 1 + src/hazard/models/drought_index.py | 1 + src/hazard/models/water_temp.py | 1 + src/hazard/onboard/csm_subsidence.py | 1 + src/hazard/onboard/flopros_flood.md | 1 + src/hazard/onboard/flopros_flood.py | 271 +++++++++++++++++++++ src/hazard/onboard/tudelft_flood.py | 9 +- src/hazard/onboard/wri_aqueduct_flood.py | 2 +- src/hazard/sources/osc_zarr.py | 115 ++++++--- src/hazard/utilities/download_utilities.py | 5 +- src/hazard/utilities/tiles.py | 31 ++- src/hazard/utilities/xarray_utilities.py | 43 ++-- src/inventories/hazard/inventory.json | 116 +++++++++ tests/test_inventory.py | 2 + tests/test_onboarding.py | 20 +- tests/test_utilities.py | 55 +++++ 17 files changed, 603 insertions(+), 77 deletions(-) create mode 100644 src/hazard/onboard/flopros_flood.md create mode 100644 src/hazard/onboard/flopros_flood.py diff --git a/src/hazard/inventory.py b/src/hazard/inventory.py index 4e722b9..770aac4 100644 --- a/src/hazard/inventory.py +++ b/src/hazard/inventory.py @@ -105,6 +105,12 @@ class HazardResource(BaseModel): map: Optional[MapInfo] = Field( description="Optional information used for display of the indicator in a map." ) + store_netcdf_coords: bool = Field( + False, + description="If True, NetCDF-style coordinates are also stored, which allows XArray to read the array \ + natively. In this case, path still points to the array; the coordinates are stored in an array group \ + in the parent folder. That is, path should be in the form path_to_array_group/array_group/array", + ) scenarios: List[Scenario] = Field( description="Climate change scenarios for which the indicator is available." ) diff --git a/src/hazard/models/days_tas_above.py b/src/hazard/models/days_tas_above.py index 99f3dd2..bb9e07f 100644 --- a/src/hazard/models/days_tas_above.py +++ b/src/hazard/models/days_tas_above.py @@ -136,6 +136,7 @@ def _resource(self) -> HazardResource: source="map_array", ), units="days/year", + store_netcdf_coords=False, scenarios=scenarios, ) return resource diff --git a/src/hazard/models/drought_index.py b/src/hazard/models/drought_index.py index e36637e..c01d370 100644 --- a/src/hazard/models/drought_index.py +++ b/src/hazard/models/drought_index.py @@ -500,6 +500,7 @@ def _resource(self) -> HazardResource: source="map_array_pyramid", ), units="months/year", + store_netcdf_coords=False, scenarios=[ # Scenario( # id="historical", diff --git a/src/hazard/models/water_temp.py b/src/hazard/models/water_temp.py index 6e06c70..0f6af9e 100644 --- a/src/hazard/models/water_temp.py +++ b/src/hazard/models/water_temp.py @@ -353,6 +353,7 @@ def _other_resource(self) -> HazardResource: group_id=self.resource.group_id, map=map, units=self.resource.units, + store_netcdf_coords=False, scenarios=[ Scenario(id="historical", years=[self.central_year_historical]), ], diff --git a/src/hazard/onboard/csm_subsidence.py b/src/hazard/onboard/csm_subsidence.py index 6a16339..e007491 100644 --- a/src/hazard/onboard/csm_subsidence.py +++ b/src/hazard/onboard/csm_subsidence.py @@ -130,6 +130,7 @@ def inventory(self) -> Iterable[HazardResource]: source="map_array_pyramid", ), units="millimetres/year", + store_netcdf_coords=False, scenarios=[Scenario(id="historical", years=[2021])], ) ] diff --git a/src/hazard/onboard/flopros_flood.md b/src/hazard/onboard/flopros_flood.md new file mode 100644 index 0000000..3795c6e --- /dev/null +++ b/src/hazard/onboard/flopros_flood.md @@ -0,0 +1 @@ +Global data set of protection levels for riverine and coastal flood, based on the [FLOPROS database] (https://nhess.copernicus.org/articles/16/1049/2016/). For each of coastal and riverine inundation, the dataset provides for every location a minimum ("min") and maximum ("max") protection, specified as a return period in years; the min and max reflects in uncertainty in the protection level. The return period indicates that the location is protected against flood events with that return period (e.g. 100 years indicates that the location is protected against 1-in-100 year flood events). Finding the equivalent flood depth protection level additionally requires a flood depth indicator data set. diff --git a/src/hazard/onboard/flopros_flood.py b/src/hazard/onboard/flopros_flood.py new file mode 100644 index 0000000..5c07507 --- /dev/null +++ b/src/hazard/onboard/flopros_flood.py @@ -0,0 +1,271 @@ +import logging +import os +import math +from pathlib import Path, PurePosixPath +from typing import Any, Dict, Iterable, List, Optional, Tuple + +import geopandas as gpd +from dask.distributed import Client +from fsspec.implementations.local import LocalFileSystem +from fsspec.spec import AbstractFileSystem +from rasterio import features +from rasterio.enums import MergeAlg +import xarray as xr + +from hazard.indicator_model import IndicatorModel +from hazard.inventory import Colormap, HazardResource, MapInfo, Scenario +from hazard.protocols import OpenDataset, ReadWriteDataArray +from hazard.sources.osc_zarr import OscZarr +from hazard.utilities.download_utilities import download_and_unzip +from hazard.utilities.tiles import create_tiles_for_resource +from hazard.utilities.xarray_utilities import ( + empty_data_array, + global_crs_transform, +) + + +logger = logging.getLogger(__name__) + + +class FLOPROSFloodStandardOfProtectionSource(OpenDataset): + def __init__(self, source_dir, fs: Optional[AbstractFileSystem] = None): + """Source that can provide FLOPROS data as an XArray raster. + + Args: + source_dir (str): directory containing source files. If fs is a S3FileSystem instance + / is expected. + fs (Optional[AbstractFileSystem], optional): AbstractFileSystem instance. + If None, a LocalFileSystem is used. + """ + self.fs = fs if fs else LocalFileSystem() + self.source_dir = source_dir + self.zip_url = "https://nhess.copernicus.org/articles/16/1049/2016/nhess-16-1049-2016-supplement.zip" + self.archive_name = self.zip_url.split("/")[-1].split(".")[0] + self.prepare() + + def prepare(self, working_dir: Optional[str] = None): + if not isinstance(self.fs, LocalFileSystem): + # e.g. we are copying to S3; download to specified working directory, but then copy to self.source_dir + assert working_dir is not None + download_and_unzip(self.zip_url, working_dir, self.archive_name) + for file in os.listdir(working_dir): + with open(file, "rb") as f: + self.fs.write_bytes(PurePosixPath(self.source_dir, file), f.read()) + else: + # download and unzip directly in location + download_and_unzip(self.zip_url, self.source_dir, self.archive_name) + logger.info("Reading database into GeoDataFrame") + path = ( + Path(self.source_dir) + / self.archive_name + / "Scussolini_etal_Suppl_info" + / "FLOPROS_shp_V1" + / "FLOPROS_shp_V1.shp" + ) + self.df = gpd.read_file(path) + + def open_dataset_year( + self, gcm: str, scenario: str, quantity: str, year: int, chunks=None + ) -> xr.Dataset: + """_summary_ + + Args: + gcm (str): Ignored. + scenario (str): Ignored. + quantity (str): 'RiverineInundation' or 'CoastalInundation'. + year (int): Ignored. + chunks (_type_, optional): _description_. Defaults to None. + + Returns: + xr.Dataset: Data set named 'indicator' with 'max' and 'min' coordinate labels in the index coordinate. + """ + hazard_type = quantity + + def get_merged_rp(row, min_max: str, flood_type: str): + """Calculate min or max from database entry (GeoDataFrame row). + + From the paper: "In practice, if information is available in the design layer for a given sub-country unit, then + this information is included in the merged layer. If no information is contained in the design layer, then the policy layer + information is included in the merged layer. Finally, if information is not available even at the policy layer, then the + model layer information is included in the merged layer." + + Args: + row: GeoDataFrame row + flood_type (str, optional): "Riv" or "Co". Defaults to "Riv". + + Returns: + float: Protection level as return period in years. + """ + layers = ["DL", "PL", "ModL"] if flood_type == "Riv" else ["DL", "PL"] + for layer in layers: # design layer, policy layer, modelled layer + # note that for the modelled layer, both min and max are set to the modelled value + layer_rp = ( + row[f"{layer}_{flood_type}"] + if layer == "ModL" + else row[f"{layer}_{min_max}_{flood_type}"] + ) + if layer_rp > 0: # if 0, layer is considered missing + # if design layer is present, use this, otherwise use the policy layer, otherwise the modelled, otherwise missing + return layer_rp + return float("Nan") # zero is no data, represented by NaN here. + + logger.info(f"Processing hazard type {hazard_type}") + min_shapes: List[Tuple[float, Any]] = [] + max_shapes: List[Tuple[float, Any]] = [] + logger.info("Inferring max and min protection levels per region") + for _, row in self.df.iterrows(): + flood_type = ( + "Riv" if hazard_type == "RiverineInundation" else "Co" + ) # riverine and coastal + min, max = ( + get_merged_rp(row, "Min", flood_type), + get_merged_rp(row, "Max", flood_type), + ) + if row["name"] is None and (min is None and max is None): + continue + # if either the min or max is NaN, that is OK: the vulnerability model is expected to deal with that + if not math.isnan(min) and not math.isnan(max) and min > max: + # it can occur that for a layer there is only information about minimum + raise ValueError("unexpected return period") + + if not math.isnan(min): + min_shapes.append((row.geometry, min)) + if not math.isnan(max): + max_shapes.append((row.geometry, max)) + + resolution_in_arc_mins = 1 + width, height = ( + int(60 * 360 / resolution_in_arc_mins), + int(60 * 180 / resolution_in_arc_mins), + ) + crs, transform = global_crs_transform(width, height) + logger.info("Creating empty array") + da = empty_data_array( + width, + height, + transform, + str(crs), + index_name="min_max", + index_values=["min", "max"], + ) + for min_max in ["min", "max"]: + logger.info( + f"Creating raster at {(360 * 60) / width} arcmin resolution for {min_max} protection" + ) + rasterized = features.rasterize( + min_shapes if min_max == "min" else max_shapes, + out_shape=[height, width], + transform=transform, + all_touched=True, + fill=float("nan"), # background value + merge_alg=MergeAlg.replace, + ) + index = 0 if min_max == "min" else 1 + da[index, :, :] = rasterized[:, :] + return da.to_dataset(name="sop") + + +class FLOPROSFloodStandardOfProtection(IndicatorModel[str]): + def __init__(self): + """ + Flood protection standards expressed as return period. + + METADATA: + Link: https://nhess.copernicus.org/articles/16/1049/2016/ + Data type: Global database of flood protection standards. + Hazard indicator: Riverine and coastal flood + Region: Global + Resolution: Country / country unit + Scenarios: Not applicable + Time range: Not applicable + File type: Shape File (.shx) + + DATA DESCRIPTION: + FLOod PROtection Standards, FLOPROS, which comprises information in the form of the flood return period + associated with protection measures, at different spatial scales. FLOPROS comprises three layers + of information, and combines them into one consistent database. The design layer contains empirical + information about the actual standard of existing protection already in place; the policy layer contains + information on protection standards from policy regulations; and the model layer uses a validated modelling + approach to calculate protection standards. + """ + + def batch_items(self): + """Get a list of all batch items.""" + return ["min_max"] # just one! + + def run_single( + self, item: str, source: Any, target: ReadWriteDataArray, client: Client + ): + assert isinstance(source, FLOPROSFloodStandardOfProtectionSource) + logger.info("Writing rasters") + for hazard_type, resource in self._resources().items(): + ds = source.open_dataset_year("", "", hazard_type, -1) + array_name = "sop" + # note that the co-ordinates will be written into the parent of resource.path + target.write( + resource.path, + ds[array_name].compute(), + spatial_coords=resource.store_netcdf_coords, + ) + + def create_maps(self, source: OscZarr, target: OscZarr): + """Create map images.""" + for resource in self.inventory(): + create_tiles_for_resource( + source, + target, + resource, + nodata_as_zero=True, + nodata_as_zero_coarsening=True, + ) + + def inventory(self) -> Iterable[HazardResource]: + """Get the inventory item(s).""" + return self._resources().values() + + def _resources(self) -> Dict[str, HazardResource]: + """Create resource.""" + with open( + os.path.join(os.path.dirname(__file__), "flopros_flood.md"), "r" + ) as f: + description = f.read() + + def path_component(hazard_type: str): + return "riverine" if hazard_type == "RiverineInundation" else "coastal" + + return { + k: HazardResource( + hazard_type=k, + indicator_id="flood_sop", + indicator_model_id="flopros", + indicator_model_gcm="", + path=( + "inundation/flopros_" + path_component(k) + "/v1/flood_sop/sop" + ), # the double path allows an XArray-readable data array to be written + params={}, + display_name="Standard of protection (FLOPROS)", + description=description, + group_id="", + display_groups=[], + map=MapInfo( + bbox=[], + bounds=[], + colormap=Colormap( + max_index=255, + min_index=1, + nodata_index=0, + name="flare", + min_value=0.0, + max_value=1500.0, + units="years", + ), + index_values=None, + path=f"maps/inundation/flopros_{path_component(k)}/v1/flood_sop_map", + source="map_array_pyramid", + ), + units="years", + store_netcdf_coords=True, + scenarios=[Scenario(id="historical", years=[1985])], + ) + for k in ["RiverineInundation", "CoastalInundation"] + } diff --git a/src/hazard/onboard/tudelft_flood.py b/src/hazard/onboard/tudelft_flood.py index 2d8bfe2..cbfbf8a 100644 --- a/src/hazard/onboard/tudelft_flood.py +++ b/src/hazard/onboard/tudelft_flood.py @@ -190,7 +190,7 @@ def run_single( da_sop.rio.transform(), str(da_sop.crs), index_name="standard of protection (years)", - indexes=["min", "max"], + index_values=["min", "max"], ) values_max_sop = np.array(da_sop.data, dtype="float32") sop_no_data = da_sop.attrs["nodatavals"] @@ -215,7 +215,7 @@ def run_single( shape[0], da_depth.rio.transform(), str(da_depth.crs), - indexes=self.return_periods, + index_values=self.return_periods, ) if da_depth.shape[1] == 38375: da_depth = da_depth[:, 0:38374] @@ -306,6 +306,7 @@ def inventory(self) -> Iterable[HazardResource]: source="map_array_pyramid", ), units="metres", + store_netcdf_coords=False, scenarios=[ Scenario(id="historical", years=[1985]), Scenario(id="rcp4p5", years=[2035, 2085]), @@ -340,6 +341,7 @@ def inventory(self) -> Iterable[HazardResource]: source="map_array_pyramid", ), units="years", + store_netcdf_coords=False, scenarios=[ Scenario(id="historical", years=[1985]), Scenario(id="rcp4p5", years=[2035, 2085]), @@ -464,7 +466,7 @@ def run_single( shape[0], da.rio.transform(), str(da.crs), - indexes=self.return_periods, + index_values=self.return_periods, ) values = da.data no_data = da.attrs["nodatavals"] @@ -512,6 +514,7 @@ def inventory(self) -> Iterable[HazardResource]: source="map_array_pyramid", ), units="metres", + store_netcdf_coords=False, scenarios=[ Scenario(id="historical", years=[1971]), Scenario(id="rcp4p5", years=[2035, 2085]), diff --git a/src/hazard/onboard/wri_aqueduct_flood.py b/src/hazard/onboard/wri_aqueduct_flood.py index 04b92d6..eeb67c7 100644 --- a/src/hazard/onboard/wri_aqueduct_flood.py +++ b/src/hazard/onboard/wri_aqueduct_flood.py @@ -568,7 +568,7 @@ def run_single( da.transform[5], ), str(da.crs), - indexes=self.return_periods, + index_values=self.return_periods, ) # ('band', 'y', 'x') values = da[ diff --git a/src/hazard/sources/osc_zarr.py b/src/hazard/sources/osc_zarr.py index 977da9c..65afff6 100644 --- a/src/hazard/sources/osc_zarr.py +++ b/src/hazard/sources/osc_zarr.py @@ -68,18 +68,18 @@ def create_empty( transform: Affine, crs: str, overwrite=False, - index_name: Optional[str] = None, - indexes=[0], + index_name: Optional[str] = "index", + index_values: Any = [0], chunks=None, ): return self._zarr_create( path, - (len(indexes), height, width), + (len(index_values), height, width), transform, str(crs), overwrite, index_name=index_name, - indexes=indexes, + index_values=index_values, chunks=chunks, ) @@ -149,9 +149,16 @@ def write( spatial_coords: Optional[bool] = True, ): if self.write_xarray_compatible_zarr and spatial_coords: - self.write_data_array(f"{path}_xarray", da) - - self.write_zarr(path, da, chunks) + pp = PurePosixPath(path) + if da.name != pp.name: + raise ValueError( + f"when writing NetCDF style coordinates, final element of path (here {pp.name}) must be \ + the same as the array name (here {da.name})" + ) + parent_path = pp.parent + self.write_data_array(str(parent_path), da) + else: + self.write_zarr(path, da, chunks) def write_zarr( self, path: str, da: xr.DataArray, chunks: Optional[List[int]] = None @@ -162,6 +169,10 @@ def write_zarr( path (str): Relative path. da (xr.DataArray): The DataArray. """ + if "lon" not in da.dims and "longitude" not in da.dims and "x" not in da.dims: + raise ValueError("longitude or x dimension not found.") + if "lat" not in da.dims and "latitude" not in da.dims and "y" not in da.dims: + raise ValueError("latitude dimension not found.") da_norm = xarray_utilities.normalize_array(da) data, transform, crs = xarray_utilities.get_array_components( da_norm, assume_normalized=True @@ -170,8 +181,11 @@ def write_zarr( path, da_norm.shape, transform, - crs.to_string(), - indexes=da_norm.index.data, + str(crs), + index_name=str(da_norm.dims[0]), + index_values=da_norm[ + da_norm.dims[0] + ].data, # the index dimension; allow to have name other than 'index' chunks=chunks, ) z[:, :, :] = data[:, :, :] @@ -197,9 +211,11 @@ def new_empty_like(self, path: str, da: xr.DataArray) -> xr.DataArray: path, da.shape, transform, - crs.to_string(), + str(crs), overwrite=True, - indexes=da.coords["index"].data, + index_values=da[ + da.dims[0] + ].data, # the index dimension; alow to have name other than 'index' ) return self._data_array_from_zarr(z) @@ -215,19 +231,31 @@ def write_data_array(self, path: str, da: xr.DataArray): da (xr.DataArray): The DataArray. """ # we expect the data to be called 'data' - if "lon" not in da.dims and "longitude" not in da.dims: - raise ValueError("longitude dimension not found.") - if "lat" not in da.dims and "latitude" not in da.dims: - raise ValueError("latitude dimension not found.") - try: - renamed = da.rename({"lat": "latitude", "lon": "longitude"}) - except Exception: - renamed = da - renamed.name = "data" - _, transform, crs = xarray_utilities.get_array_components(renamed) - self._add_attributes(renamed.attrs, transform, crs.to_string()) - renamed.to_dataset().to_zarr( - self.root.store, compute=True, group=path, mode="w", consolidated=True + if "lon" not in da.dims and "longitude" not in da.dims and "x" not in da.dims: + raise ValueError("longitude or x dimension not found.") + if "lat" not in da.dims and "latitude" not in da.dims and "y" not in da.dims: + raise ValueError("latitude or y dimension not found.") + da_norm = xarray_utilities.normalize_array(da) + _, transform, crs = xarray_utilities.get_array_components( + da_norm, assume_normalized=True + ) + self._add_attributes( + da_norm.attrs, + transform, + str(crs), + index_name=str(da_norm.dims[0]), + index_values=da_norm[da_norm.dims[0]].data, + ) + options = {"write_empty_chunks": False} + if da.chunks is None: + options["chunks"] = self._chunks(da_norm[da_norm.dims[0]].data) + da_norm.to_dataset().to_zarr( + self.root.store, + compute=True, + group=path, + mode="w", + consolidated=False, + encoding={da_norm.name: options}, ) @staticmethod @@ -249,7 +277,7 @@ def _zarr_create( crs: str, overwrite=False, index_name: Optional[str] = None, - indexes=None, + index_values: Any = None, chunks=None, ): """ @@ -262,12 +290,15 @@ def _zarr_create( if len(shape) == 3: zarr_shape = shape elif len(shape) == 2: - zarr_shape = (1 if indexes is None else len(indexes), shape[0], shape[1]) + zarr_shape = ( + 1 if index_values is None else len(index_values), + shape[0], + shape[1], + ) else: raise ValueError("shape of DataArray must have length of 2 or 3.") if chunks is None: - chunk_dim = 1000 if len(indexes) < 10 else 500 - chunks = (1 if indexes is None else len(indexes), chunk_dim, chunk_dim) + chunks = self._chunks(index_values) z = self.root.create_dataset( path, shape=zarr_shape, @@ -277,14 +308,14 @@ def _zarr_create( write_empty_chunks=False, fill_value=float("nan"), ) # array_path interpreted as path within group - if isinstance(indexes, np.ndarray) and indexes.dtype in [ + if isinstance(index_values, np.ndarray) and index_values.dtype in [ "int16", "int32", "int64", ]: - indexes = [int(i) for i in indexes] + index_values = [int(i) for i in index_values] self._add_attributes( - z.attrs, transform, crs, index_name=index_name, indexes=indexes + z.attrs, transform, crs, index_name=index_name, index_values=index_values ) return z @@ -294,7 +325,8 @@ def _add_attributes( transform: Affine, crs: str, index_name: Optional[str] = None, - indexes=None, + index_values: Any = None, + index_units: str = "", ): trans_members = [ transform.a, @@ -309,13 +341,20 @@ def _add_attributes( use_xy = crs.upper() != "EPSG:4326" attrs["transform_mat3x3"] = mat3x3 attrs["dimensions"] = ( - ["index", "y", "x"] if use_xy else ["index", "latitude", "longitude"] + [index_name, "y", "x"] if use_xy else [index_name, "latitude", "longitude"] ) - if indexes is not None: - attrs["index_values"] = list(indexes) - attrs["index_name"] = ( - index_name if index_name is not None else "return period (years)" - ) + if index_values is not None: + attrs[f"{index_name}_values"] = list(index_values) + attrs[f"{index_name}_units"] = index_units + + def _chunks(self, index_values: List[Any]): + chunk_dim = 1000 if len(index_values) < 10 else 500 + chunks = ( + 1 if index_values is None else len(index_values), + chunk_dim, + chunk_dim, + ) + return chunks @staticmethod def normalize_dims(da: xr.DataArray) -> xr.DataArray: diff --git a/src/hazard/utilities/download_utilities.py b/src/hazard/utilities/download_utilities.py index 7dd73eb..c18f457 100644 --- a/src/hazard/utilities/download_utilities.py +++ b/src/hazard/utilities/download_utilities.py @@ -1,4 +1,5 @@ import os +from pathlib import Path import re import zipfile from typing import Optional @@ -16,7 +17,9 @@ def download_file(url: str, directory: str, filename: Optional[str] = None): "filename not provided and cannot infer from content-disposition" ) r.raise_for_status() - with open(os.path.join(directory, filename), "wb") as f: + dir = Path(directory) + dir.mkdir(exist_ok=True, parents=True) + with open(dir / filename, "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) return filename diff --git a/src/hazard/utilities/tiles.py b/src/hazard/utilities/tiles.py index 4a8b3e3..0be7e6a 100644 --- a/src/hazard/utilities/tiles.py +++ b/src/hazard/utilities/tiles.py @@ -2,7 +2,7 @@ import math import os import posixpath -from typing import Optional, Sequence, Tuple +from typing import Any, Optional, Sequence, Tuple import mercantile import numpy as np # type: ignore @@ -62,7 +62,8 @@ def create_tiles_for_resource( map_path = resource.map.path.format(scenario=scenario.id, year=year) if resource.map.index_values is not None: da = source.read(path) - indexes = list(da["index"].values) + index_dim = da.dims[0] # should be the index dimension + indexes = list(da[index_dim].values) indices = [indexes.index(v) for v in resource.map.index_values] create_tile_set( source, @@ -141,7 +142,7 @@ def _create_image_set( dst_height, dst_transform, dst_crs, - indexes=return_periods, + index_values=return_periods, chunks=(1, 4000, 4000), ) @@ -204,13 +205,10 @@ def create_tile_set( raise ValueError("invalid target path {target_path}") da = source.read(source_path) # .to_dataset() - return_periods = da.index.data + index_values = da[da.dims[0]].data + + src_width = da.sizes["longitude"] if "longitude" in da.sizes else da.sizes["x"] - _, _, src_width = ( - da.sizes["index"], - da.sizes["latitude"] if "latitude" in da.sizes else da.sizes["y"], - da.sizes["longitude"] if "longitude" in da.sizes else da.sizes["x"], - ) chunk_size: int = 512 pixels_per_tile = 256 os.environ["CHECK_WITH_INVERT_PROJ"] = "YES" @@ -233,7 +231,7 @@ def create_tile_set( if indices is None: index_slice = slice(-1, None) - indices = range(len(return_periods))[index_slice] + indices = range(len(index_values))[index_slice] logger.info(f"Starting map tiles generation for array {source_path}.") logger.info(f"Source array size is {da.shape} (z, y, x).") logger.info(f"Indices (z) subset to be processed: {indices}.") @@ -242,7 +240,12 @@ def create_tile_set( # _coarsen(target, target_path, max_zoom, (left, bottom, right, top), indices) target.remove(target_path) _create_empty_tile_pyramid( - target, target_path, max_zoom, chunk_size, return_periods + target, + target_path, + max_zoom, + chunk_size, + index_name=str(da.dims[0]), + index_values=index_values, ) # here we reproject and write the maximum zoom level @@ -275,7 +278,8 @@ def _create_empty_tile_pyramid( target_path: str, max_zoom: int, chunk_size: int, - return_periods: Sequence[float], + index_name: str = "index", + index_values: Sequence[Any] = [0], ): pixels_per_tile: int = 256 ulx, uly = mercantile.xy(*mercantile.ul(mercantile.Tile(x=0, y=0, z=0))) @@ -297,7 +301,8 @@ def _create_empty_tile_pyramid( dst_dim, whole_map_transform, dst_crs, - indexes=return_periods, + index_name=index_name, + index_values=index_values, chunks=(1, chunk_size, chunk_size), ) diff --git a/src/hazard/utilities/xarray_utilities.py b/src/hazard/utilities/xarray_utilities.py index 70f55e0..e379141 100644 --- a/src/hazard/utilities/xarray_utilities.py +++ b/src/hazard/utilities/xarray_utilities.py @@ -126,8 +126,8 @@ def data_array_from_zarr(z: zarr.core.Array) -> xr.DataArray: t = z.attrs["transform_mat3x3"] # type: ignore crs: str = z.attrs.get("crs", "EPSG:4326") # type: ignore transform = Affine(t[0], t[1], t[2], t[3], t[4], t[5]) - index_values = z.attrs.get("index_values", [0]) - # index_name = z.attrs.get("index_name", [0]) + index_name = z.attrs.get("dimensions", ["index"])[0] + index_values = z.attrs.get(index_name + "_values", [0]) if index_values is None: index_values = [0] coords = affine_to_coords( @@ -139,10 +139,10 @@ def data_array_from_zarr(z: zarr.core.Array) -> xr.DataArray: ) if "EPSG:4326" in crs.upper(): da.rio.write_crs(4326, inplace=True) - da = da.rename({"dim_0": "index", "dim_1": "latitude", "dim_2": "longitude"}) + da = da.rename({"dim_0": index_name, "dim_1": "latitude", "dim_2": "longitude"}) else: da.rio.write_crs(crs, inplace=True) - da = da.rename({"dim_0": "index", "dim_1": "y", "dim_2": "x"}) + da = da.rename({"dim_0": index_name, "dim_1": "y", "dim_2": "x"}) return da @@ -178,7 +178,7 @@ def get_array_components( crs = da.rio.crs if crs is None: # assumed default - crs = rasterio.CRS.from_epsg(4326) + crs = da.attrs.get("crs", rasterio.CRS.from_epsg(4326)) return (data, transform, crs) @@ -216,8 +216,8 @@ def normalize_array(da: xr.DataArray) -> xr.DataArray: # noqa: C901 if len(da.dims) == 3: if len(index_dim) != 1: raise Exception(f"unexpected dims {da.dims}") - da_norm.set_index({"index": index_dim[0]}) - da_norm = da_norm.transpose("index", dim_y, dim_x) + # keep the name the same name, but this has to be the first dimension + da_norm = da_norm.transpose(index_dim[0], dim_y, dim_x) elif len(da.dims) == 2: da_norm = da_norm.expand_dims(dim={"index": 1}, axis=0) da_norm = da_norm.transpose("index", dim_y, dim_x) @@ -229,9 +229,6 @@ def normalize_array(da: xr.DataArray) -> xr.DataArray: # noqa: C901 elif "y" in da_norm.dims and da_norm.y[-1] > da_norm.y[0]: da_norm = da_norm.reindex(y=da_norm.y[::-1]) - if not da_norm.rio.crs: - da_norm.rio.write_crs(4326, inplace=True) - if "longitude" in da_norm.dims and np.any(da_norm.longitude > 180): longitude = da_norm.longitude longitude = np.where(longitude > 180, longitude - 360, longitude) @@ -245,18 +242,23 @@ def data_array( data: np.ndarray, transform: Affine, crs: str = "EPSG:4326", - index_values: List[float] = [0], + name: str = "data", + index_name: str = "index", + index_units: str = "", + index_values: List[Any] = [0], ): n_indices, height, width = data.shape assert len(index_values) == n_indices - z_dim = "index" + z_dim = index_name y_dim, x_dim = ( ("latitude", "longitude") if crs.upper() == "EPSG:4326" else ("y", "x") ) coords = affine_to_coords(transform, width, height, x_dim=x_dim, y_dim=y_dim) - coords[z_dim] = np.array(index_values, dtype=float) - da = xr.DataArray(data=data, coords=coords, dims=[z_dim, y_dim, x_dim]) + coords[z_dim] = np.array(index_values) + da = xr.DataArray(data=data, coords=coords, dims=[z_dim, y_dim, x_dim], name=name) + if index_units != "": + da.attrs[z_dim + "_units"] = index_units return da @@ -265,10 +267,19 @@ def empty_data_array( height: int, transform: Affine, crs: str = "EPSG:4326", - index_values: List[float] = [0], + index_name: str = "index", + index_units: str = "", + index_values: List[Any] = [0], ): data = dask.array.empty(shape=[len(index_values), height, width]) - return data_array(data, transform, crs, index_values) + return data_array( + data, + transform, + crs, + index_name=index_name, + index_units=index_units, + index_values=index_values, + ) def write_array(array: xr.DataArray, path: str): diff --git a/src/inventories/hazard/inventory.json b/src/inventories/hazard/inventory.json index 625577e..3e47687 100644 --- a/src/inventories/hazard/inventory.json +++ b/src/inventories/hazard/inventory.json @@ -49,6 +49,7 @@ "index_values": null, "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -110,6 +111,7 @@ ], "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "rcp4p5", @@ -181,6 +183,7 @@ ], "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "rcp4p5", @@ -252,6 +255,7 @@ ], "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "rcp4p5", @@ -323,6 +327,7 @@ ], "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "rcp4p5", @@ -394,6 +399,7 @@ ], "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "rcp4p5", @@ -465,6 +471,7 @@ ], "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -526,6 +533,7 @@ ], "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "rcp4p5", @@ -597,6 +605,7 @@ ], "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "rcp4p5", @@ -668,6 +677,7 @@ ], "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "rcp4p5", @@ -739,6 +749,7 @@ ], "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -800,6 +811,7 @@ ], "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "rcp4p5", @@ -871,6 +883,7 @@ ], "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "rcp4p5", @@ -942,6 +955,7 @@ ], "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "rcp4p5", @@ -1022,6 +1036,7 @@ "index_values": null, "source": "map_array" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -1105,6 +1120,7 @@ "index_values": null, "source": "map_array" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "ssp126", @@ -1180,6 +1196,7 @@ "index_values": null, "source": "map_array" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "ssp126", @@ -1255,6 +1272,7 @@ "index_values": null, "source": "map_array" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "ssp126", @@ -1330,6 +1348,7 @@ "index_values": null, "source": "map_array" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "ssp126", @@ -1405,6 +1424,7 @@ "index_values": null, "source": "map_array" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "ssp126", @@ -1480,6 +1500,7 @@ "index_values": null, "source": "map_array" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "ssp126", @@ -1555,6 +1576,7 @@ "index_values": null, "source": "map_array" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "ssp126", @@ -1646,6 +1668,7 @@ "index_values": null, "source": "map_array" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -1749,6 +1772,7 @@ "index_values": null, "source": "map_array" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -1832,6 +1856,7 @@ "index_values": null, "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -1924,6 +1949,7 @@ ], "source": "map_array" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -2022,6 +2048,7 @@ ], "source": "map_array" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -2131,6 +2158,7 @@ ], "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -2260,6 +2288,7 @@ ], "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -2343,6 +2372,7 @@ ], "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -2452,6 +2482,7 @@ "index_values": null, "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -2537,6 +2568,7 @@ "index_values": null, "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -2622,6 +2654,7 @@ "index_values": null, "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -2707,6 +2740,7 @@ "index_values": null, "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -2792,6 +2826,7 @@ "index_values": null, "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -2877,6 +2912,7 @@ "index_values": null, "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -2979,6 +3015,7 @@ ], "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "ssp585", @@ -3020,6 +3057,7 @@ "index_values": null, "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -3071,6 +3109,7 @@ "index_values": null, "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", @@ -3095,6 +3134,82 @@ ], "units": "years" }, + { + "hazard_type": "RiverineInundation", + "group_id": "", + "path": "inundation/flopros_riverine/v1/flood_sop/sop", + "indicator_id": "flood_sop", + "indicator_model_id": "flopros", + "indicator_model_gcm": "", + "params": {}, + "display_name": "Standard of protection (FLOPROS)", + "display_groups": [], + "description": "Global data set of protection levels for riverine and coastal flood, based on the [FLOPROS database] (https://nhess.copernicus.org/articles/16/1049/2016/). For each of coastal and riverine inundation, the dataset provides for every location a minimum (\"min\") and maximum (\"max\") protection, specified as a return period in years; the min and max reflects in uncertainty in the protection level. The return period indicates that the location is protected against flood events with that return period (e.g. 100 years indicates that the location is protected against 1-in-100 year flood events). Finding the equivalent flood depth protection level additionally requires a flood depth indicator data set.\n", + "map": { + "colormap": { + "min_index": 1, + "min_value": 0.0, + "max_index": 255, + "max_value": 1500.0, + "name": "flare", + "nodata_index": 0, + "units": "years" + }, + "path": "maps/inundation/flopros_riverine/v1/flood_sop_map", + "bounds": [], + "bbox": [], + "index_values": null, + "source": "map_array_pyramid" + }, + "store_netcdf_coords": true, + "scenarios": [ + { + "id": "historical", + "years": [ + 1985 + ] + } + ], + "units": "years" + }, + { + "hazard_type": "CoastalInundation", + "group_id": "", + "path": "inundation/flopros_coastal/v1/flood_sop/sop", + "indicator_id": "flood_sop", + "indicator_model_id": "flopros", + "indicator_model_gcm": "", + "params": {}, + "display_name": "Standard of protection (FLOPROS)", + "display_groups": [], + "description": "Global data set of protection levels for riverine and coastal flood, based on the [FLOPROS database] (https://nhess.copernicus.org/articles/16/1049/2016/). For each of coastal and riverine inundation, the dataset provides for every location a minimum (\"min\") and maximum (\"max\") protection, specified as a return period in years; the min and max reflects in uncertainty in the protection level. The return period indicates that the location is protected against flood events with that return period (e.g. 100 years indicates that the location is protected against 1-in-100 year flood events). Finding the equivalent flood depth protection level additionally requires a flood depth indicator data set.\n", + "map": { + "colormap": { + "min_index": 1, + "min_value": 0.0, + "max_index": 255, + "max_value": 1500.0, + "name": "flare", + "nodata_index": 0, + "units": "years" + }, + "path": "maps/inundation/flopros_coastal/v1/flood_sop_map", + "bounds": [], + "bbox": [], + "index_values": null, + "source": "map_array_pyramid" + }, + "store_netcdf_coords": true, + "scenarios": [ + { + "id": "historical", + "years": [ + 1985 + ] + } + ], + "units": "years" + }, { "hazard_type": "Subsidence", "group_id": "", @@ -3144,6 +3259,7 @@ "index_values": null, "source": "map_array_pyramid" }, + "store_netcdf_coords": false, "scenarios": [ { "id": "historical", diff --git a/tests/test_inventory.py b/tests/test_inventory.py index d8ff4ac..ae14615 100644 --- a/tests/test_inventory.py +++ b/tests/test_inventory.py @@ -10,6 +10,7 @@ from hazard.models.water_temp import WaterTemperatureAboveIndicator from hazard.models.wet_bulb_globe_temp import WetBulbGlobeTemperatureAboveIndicator from hazard.models.work_loss import WorkLossIndicator +from hazard.onboard.flopros_flood import FLOPROSFloodStandardOfProtection from hazard.onboard.csm_subsidence import DavydzenkaEtAlLandSubsidence from hazard.onboard.iris_wind import IRISIndicator from hazard.onboard.jupiter import Jupiter @@ -48,6 +49,7 @@ def test_create_inventory(test_output_dir): # noqa: F811 WRIAqueductWaterRisk(), DroughtIndicator(None), TUDelftRiverFlood(None), + FLOPROSFloodStandardOfProtection(), DavydzenkaEtAlLandSubsidence(None), ] diff --git a/tests/test_onboarding.py b/tests/test_onboarding.py index 88d6574..415bd82 100644 --- a/tests/test_onboarding.py +++ b/tests/test_onboarding.py @@ -9,6 +9,10 @@ import zarr import zarr.convenience from hazard.docs_store import DocStore +from hazard.onboard.flopros_flood import ( + FLOPROSFloodStandardOfProtection, + FLOPROSFloodStandardOfProtectionSource, +) from hazard.onboard.rain_european_winter_storm import RAINEuropeanWinterStorm from hazard.models.water_temp import FutureStreamsSource, WaterTemperatureAboveIndicator from hazard.models.wet_bulb_globe_temp import WetBulbGlobeTemperatureAboveIndicator @@ -254,7 +258,6 @@ def test_onboard_landslides_jrc(test_output_dir): target = OscZarr(store=store) for batch_item in batch_items: model.run_single(batch_item, None, target, None) - # model.create_maps(target, target) @@ -268,7 +271,6 @@ def test_onboard_subsidence_jrc(test_output_dir): target = OscZarr(store=store) for batch_item in batch_items: model.run_single(batch_item, None, target, None) - # model.create_maps(target, target) @@ -283,7 +285,6 @@ def test_onboard_fire_tudelft(test_output_dir): for batch_item in batch_items: model.prepare(batch_item) model.run_single(batch_item, None, target, None) - # model.create_maps(target, target) @@ -298,7 +299,6 @@ def test_onboard_conv_wind_tudelft(test_output_dir): for batch_item in batch_items: model.prepare(batch_item) model.run_single(batch_item, None, target, None) - # model.create_maps(target, target) @@ -313,10 +313,20 @@ def test_onboard_coastalflood_tudelft(test_output_dir): target = OscZarr(store=store) for batch_item in batch_items: model.run_single(batch_item, None, target, None) - # model.create_maps(target, target) +@pytest.mark.skip(reason="on-boarding script") +def test_onboard_flopros(test_output_dir): + source_path = os.path.join(test_output_dir, "flopros") + source = FLOPROSFloodStandardOfProtectionSource(source_path) + model = FLOPROSFloodStandardOfProtection() + store = zarr.DirectoryStore(os.path.join(test_output_dir, "hazard", "hazard.zarr")) + target = OscZarr(store=store, write_xarray_compatible_zarr=True) + model.run_all(source, target) + model.create_maps(target, target) + + @pytest.mark.skip(reason="on-boarding script") def test_litpop(test_output_dir): store = zarr.DirectoryStore(os.path.join(test_output_dir, "hazard", "hazard.zarr")) diff --git a/tests/test_utilities.py b/tests/test_utilities.py index 0c41f4e..fce9d9d 100644 --- a/tests/test_utilities.py +++ b/tests/test_utilities.py @@ -1,6 +1,8 @@ import os +from typing import List, Optional import numpy as np # type: ignore +import xarray as xr import zarr # type: ignore from hazard.sources.osc_zarr import OscZarr @@ -26,3 +28,56 @@ def test_xarray_write_small(test_output_dir): # noqa: F811 # target = OscZarr() target = OscZarr(store=store) target.write("test/test_small", da) + + +def test_xarray_write_net_cdf_coords(test_output_dir): # noqa: F811 + """Test writing of XArrays with NetCDF-type style co-ordinates. + Hazard indicators are generally maps of values, i.e. have two (spatial) dimensions, but often it is convenient + to group a set of indicators (for a given scenario and year). For example, flood depths with different return + periods or chronic indicators with different temperature thresholds and we want to be able look up maps using + an index. To this end, an 'index' coordinate to the array is added. + Physrisk requires an array with dimensions + [index, "y", "x"] or + [index, "latitude", "longitude"]; + the index co-ordinate could be for example "temperature", "flood_depth". + """ + _, affine = global_crs_transform(3600, 1800) + x = np.linspace(0, 1, 3600) + y = np.linspace(0, 1, 1800) + xx, yy = np.meshgrid(x, y) + data = np.empty((2, 1800, 3600)) + data[0, :, :] = np.sin(xx) * np.sin(yy) + data[1, :, :] = np.sin(xx) * np.sin(yy) + da = data_array( + data, affine, index_name="min_max", index_values=["min", "max"], name="sop" + ) + store = zarr.DirectoryStore(os.path.join(test_output_dir, "hazard", "hazard.zarr")) + # to write to the dev bucket, use simply + # target = OscZarr() + target = OscZarr(store=store, write_xarray_compatible_zarr=True) + target.write("test/test_netcdf_coords/sop", da, spatial_coords=True) + + z = target.read_zarr("test/test_netcdf_coords/sop") + assert z.attrs["dimensions"][0] == "min_max" + assert z.attrs["min_max_values"] == ["min", "max"] + assert z.attrs["min_max_units"] == "" + + +def test_xarray_write_net_cdf_coords_huge(test_output_dir): # noqa: F811 + """Do we need + See distributed writes of: + https://docs.xarray.dev/en/stable/user-guide/io.html + + Writing a large empty array requires a certain approach, add to protocol? + da.to_dataset(name="flood_sop").to_zarr(store=store, + group="inundation/flopros_riverine//v1/flood_sop", + compute=False, + mode="w", + encoding={"flood_sop" : { + "chunks" : (1, 1000, 1000), + "write_empty_chunks": False, + } + }) + + """ + ...