From 02209eb9f33aa5f2098da2414f036f937f1201cd Mon Sep 17 00:00:00 2001 From: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> Date: Mon, 8 Apr 2024 21:41:13 +0100 Subject: [PATCH] Fast generation of map tiles where map covers only part of globe (#54) * Working version --------- Signed-off-by: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> --- src/hazard/onboard/ethz_litpop.py | 13 +- src/hazard/onboard/tudelft_flood.py | 82 ++++--- src/hazard/sources/osc_zarr.py | 2 + src/hazard/utilities/s3_utilities.py | 44 ++++ src/hazard/utilities/tiles.py | 284 +++++++++++++++++------ src/hazard/utilities/xarray_utilities.py | 2 + src/inventories/hazard/inventory.json | 50 ++++ tests/inventory_test.py | 2 + tests/onboarding_test.py | 23 +- tests/tile_creation_test.py | 108 +++++---- 10 files changed, 467 insertions(+), 143 deletions(-) diff --git a/src/hazard/onboard/ethz_litpop.py b/src/hazard/onboard/ethz_litpop.py index e7c1e96..ca98657 100644 --- a/src/hazard/onboard/ethz_litpop.py +++ b/src/hazard/onboard/ethz_litpop.py @@ -73,6 +73,7 @@ def prepare(self, working_dir: Optional[str] = None): if not isinstance(self.fs, LocalFileSystem): # e.g. we are copying to S3; download to specified working directory, # but then copy to self.source_dir + assert working_dir is not None download_and_unzip(self.zip_url, working_dir, self.zip_url.split("/")[-1].split(".")[0]) for file in os.listdir(working_dir): with open(file, "rb") as f: @@ -159,10 +160,11 @@ def _resources(self) -> Dict[str, HazardResource]: hazard_type="SpatialDistribution", indicator_id=key, indicator_model_gcm="", + indicator_model_id=None, path=path, params={}, - display_name=resource_map[key]["display_name"], - description=resource_map[key]["description"], + display_name=str(resource_map[key]["display_name"]), + description=str(resource_map[key]["description"]), group_id="", display_groups=[], map=MapInfo( @@ -178,13 +180,14 @@ def _resources(self) -> Dict[str, HazardResource]: nodata_index=0, name="heating", min_value=0.0, - max_value=resource_map[key]["max_value"], - units=resource_map[key]["units"], + max_value=resource_map[key]["max_value"], # type:ignore + units=resource_map[key]["units"], # type:ignore ), + index_values=None, path="maps/" + path + "_map", source="map_array_pyramid", ), - units=resource_map[key]["units"], + units=str(resource_map[key]["units"]), scenarios=[ Scenario(id="historical", years=[2014]), ], diff --git a/src/hazard/onboard/tudelft_flood.py b/src/hazard/onboard/tudelft_flood.py index 6146348..861ada0 100644 --- a/src/hazard/onboard/tudelft_flood.py +++ b/src/hazard/onboard/tudelft_flood.py @@ -4,6 +4,7 @@ from pathlib import PurePosixPath from typing import Any, Iterable, Optional +import numpy as np import rioxarray # noqa: F401 import xarray as xr from dask.distributed import Client @@ -24,7 +25,8 @@ class BatchItem: scenario: str central_year: int - input_dataset_filename: str + flood_depth_filename: str + extent_protected_filename: str class TUDelftRiverFlood(IndicatorModel[BatchItem]): @@ -72,31 +74,37 @@ def __init__(self, source_dir: str, fs: Optional[AbstractFileSystem] = None): self._resource = list(self.inventory())[0] def batch_items(self) -> Iterable[BatchItem]: + return [ BatchItem( scenario="historical", central_year=1971, - input_dataset_filename="River_flood_depth_1971_2000_hist_{return_period}.tif", + flood_depth_filename="River_flood_depth_1971_2000_hist_{return_period}.tif", + extent_protected_filename="River_flood_extent_1971_2000_hist_with_protection.tif", ), BatchItem( scenario="rcp4p5", central_year=2035, - input_dataset_filename="River_flood_depth_2021_2050_RCP45_{return_period}.tif", + flood_depth_filename="River_flood_depth_2021_2050_RCP45_{return_period}.tif", + extent_protected_filename="River_flood_extent_2021_2050_RCP45_with_protection.tif", ), BatchItem( scenario="rcp8p5", central_year=2035, - input_dataset_filename="River_flood_depth_2021_2050_RCP85_{return_period}.tif", + flood_depth_filename="River_flood_depth_2021_2050_RCP85_{return_period}.tif", + extent_protected_filename="River_flood_extent_2021_2050_RCP85_with_protection.tif", ), BatchItem( scenario="rcp4p5", central_year=2085, - input_dataset_filename="River_flood_depth_2071_2100_RCP45_{return_period}.tif", + flood_depth_filename="River_flood_depth_2071_2100_RCP45_{return_period}.tif", + extent_protected_filename="River_flood_extent_2071_2100_RCP45_with_protection.tif", ), BatchItem( scenario="rcp8p5", central_year=2085, - input_dataset_filename="River_flood_depth_2071_2100_RCP85_{return_period}.tif", + flood_depth_filename="River_flood_depth_2071_2100_RCP85_{return_period}.tif", + extent_protected_filename="River_flood_extent_2071_2100_RCP85_with_protection.tif", ), ] @@ -115,34 +123,54 @@ def prepare(self, working_dir: Optional[str] = None): def run_single(self, item: BatchItem, source: Any, target: ReadWriteDataArray, client: Client): assert isinstance(target, OscZarr) - input = PurePosixPath(self.source_dir, item.input_dataset_filename) + full_path_depth_format = PurePosixPath(self.source_dir, item.flood_depth_filename) + full_path_extent = PurePosixPath(self.source_dir, item.extent_protected_filename) assert target is None or isinstance(target, OscZarr) shape = [39420, 38374] # y, x not all returns have same size (first one smaller at 38371) for i, return_period in enumerate(self.return_periods): - filename = str(input).format(return_period=self.return_period_str[return_period]) - with self.fs.open(filename, "rb") as f: - da = xr.open_rasterio(f).isel(band=0) - # bounds = da.rio.bounds() - if return_period == self.return_periods[0]: - z = target.create_empty( - self._resource.path.format(scenario=item.scenario, year=item.central_year), - shape[1], - shape[0], - da.rio.transform(), - str(da.crs), - indexes=self.return_periods, - ) - values = da.data - no_data = da.attrs["nodatavals"] - values[values == no_data] = float("nan") - z[i, 0 : len(da.y), 0 : len(da.x)] = values[:, :] + full_path_depth = str(full_path_depth_format).format(return_period=self.return_period_str[return_period]) + with self.fs.open(full_path_depth, "rb") as fd: + dad = xr.open_rasterio(fd).isel(band=0) + with self.fs.open(full_path_extent, "rb") as fe: + dae = xr.open_rasterio(fe).isel(band=0) + # bounds = da.rio.bounds() + if return_period == self.return_periods[0]: + z = target.create_empty( + self._resource.path.format(scenario=item.scenario, year=item.central_year), + shape[1], + shape[0], + dad.rio.transform(), + str(dad.crs), + indexes=self.return_periods, + ) + # dad_nodata = 65535 + if ( + dad.shape[1] == 38371 + ): # corrections for various possible errors whereby coordinates are missing for certain files: + dae = dae[:, 0:38371] + if dae.shape[1] == 38375: + dae = dae[:, 0:38374] + if dad.shape[1] == 38375: + dad = dad[:, 0:38374] + if dae.shape[0] == 39385: + dad = dad[35:, :] + + # not quite the same coordinates: check if close, within rounding error, and align exactly + assert np.abs(np.array(dae.x) - np.array(dad.x)).max() < 1e-4 + assert np.abs(np.array(dae.y) - np.array(dad.y)).max() < 1e-4 + dae = dae.assign_coords({"x": dad.x, "y": dad.y}) + da_combined = xr.where(dae <= return_period, dad, 0) + values = da_combined.data + depth_no_data = dad.attrs["nodatavals"] + values[values == depth_no_data] = float("nan") + z[i, 0 : len(da_combined.y), 0 : len(da_combined.x)] = values[:, :] def create_maps(self, source: OscZarr, target: OscZarr): """ Create map images. """ ... - create_tiles_for_resource(source, target, self._resource) + create_tiles_for_resource(source, target, self._resource, max_zoom=10) def inventory(self) -> Iterable[HazardResource]: """Get the (unexpanded) HazardModel(s) that comprise the inventory.""" @@ -182,8 +210,8 @@ def inventory(self) -> Iterable[HazardResource]: units="metres", scenarios=[ Scenario(id="historical", years=[1971]), - Scenario(id="rcp45", years=[2050, 2070]), - Scenario(id="rcp85", years=[2050, 2070]), + Scenario(id="rcp4p5", years=[2035, 2085]), + Scenario(id="rcp8p5", years=[2035, 2085]), ], ) ] diff --git a/src/hazard/sources/osc_zarr.py b/src/hazard/sources/osc_zarr.py index 00c3258..9f48417 100644 --- a/src/hazard/sources/osc_zarr.py +++ b/src/hazard/sources/osc_zarr.py @@ -244,6 +244,8 @@ def _zarr_create( chunks=chunks, dtype="f4", overwrite=overwrite, + write_empty_chunks=False, + fill_value=float("nan"), ) # array_path interpreted as path within group if isinstance(indexes, np.ndarray) and indexes.dtype in [ "int16", diff --git a/src/hazard/utilities/s3_utilities.py b/src/hazard/utilities/s3_utilities.py index ff98d03..5ccce4d 100644 --- a/src/hazard/utilities/s3_utilities.py +++ b/src/hazard/utilities/s3_utilities.py @@ -1,5 +1,7 @@ import logging import os +import pathlib +import sys from typing import Callable, Optional, Sequence import boto3 @@ -7,6 +9,48 @@ logger = logging.getLogger(__name__) +def copy_local_to_dev(zarr_dir: str, array_path: str, dry_run=False): + """Copy zarr array from a local directory to the development bucket. + Requires environment variables: + OSC_S3_BUCKET_DEV=physrisk-hazard-indicators-dev01 + OSC_S3_ACCESS_KEY_DEV=... + OSC_S3_SECRET_KEY_DEV=... + + Args: + zarr_dir (str): Directory of the Zarr group, i.e. //hazard/hazard.zarr. + array_path (str): The path of the array within the group. + dry_run (bool, optional): If True, log the action that would be taken without actually executing. Defaults to False. + """ + + logging.basicConfig( + level=logging.INFO, + format="[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s", + handlers=[ + logging.FileHandler(filename="batch.log"), + logging.StreamHandler(sys.stdout), + ], + ) + + s3_target_client = boto3.client( + "s3", + aws_access_key_id=os.environ["OSC_S3_ACCESS_KEY_DEV"], + aws_secret_access_key=os.environ["OSC_S3_SECRET_KEY_DEV"], + ) + target_bucket_name = os.environ["OSC_S3_BUCKET_DEV"] + logger.info(f"Source path {zarr_dir}; target bucket {target_bucket_name}") + + files = [f for f in pathlib.Path(zarr_dir, array_path).iterdir() if f.is_file()] + logger.info(f"Copying {len(files)} files") + if not dry_run: + for i, file in enumerate(files): + with open(file, "rb") as f: + data = f.read() + target_key = str(pathlib.PurePosixPath("hazard", "hazard.zarr", array_path, file.name)) + s3_target_client.put_object(Body=data, Bucket=target_bucket_name, Key=target_key) + if i % 100 == 0: + logger.info(f"Completed {i}/{len(files)}") + + def copy_dev_to_prod(prefix: str, dry_run=False): """Use this script to copy files with the prefix specified from dev S3 to prod S3. diff --git a/src/hazard/utilities/tiles.py b/src/hazard/utilities/tiles.py index 44b07fc..7163941 100644 --- a/src/hazard/utilities/tiles.py +++ b/src/hazard/utilities/tiles.py @@ -2,11 +2,15 @@ import math import os import posixpath -from typing import Optional, Sequence +from typing import Optional, Sequence, Tuple import mercantile import numpy as np # type: ignore import rasterio # type: ignore +import rasterio.coords +import rasterio.transform +import rasterio.warp +import rioxarray # noqa: F401 import xarray as xr from rasterio import CRS # type: ignore from rasterio.warp import Resampling # type: ignore @@ -18,7 +22,9 @@ logger = logging.getLogger(__name__) -def create_tiles_for_resource(source: OscZarr, target: OscZarr, resource: HazardResource): +def create_tiles_for_resource( + source: OscZarr, target: OscZarr, resource: HazardResource, max_zoom: Optional[int] = None +): if resource.map is None or resource.map.source != "map_array_pyramid": raise ValueError("resource does not specify 'map_array_pyramid' map source.") indices = None @@ -33,10 +39,7 @@ def create_tiles_for_resource(source: OscZarr, target: OscZarr, resource: Hazard da = source.read(path) indexes = list(da["index"].values) indices = [indexes.index(v) for v in resource.map.index_values] - create_tile_set(source, path, target, map_path, indices=indices) - - -# def create_image_set_for_resource(source: OscZarr, target: OscZarr, resource: HazardResource): + create_tile_set(source, path, target, map_path, indices=indices, max_zoom=max_zoom) def create_image_set( @@ -121,8 +124,9 @@ def create_tile_set( target: OscZarr, target_path: str, indices: Optional[Sequence[int]] = None, - max_tile_batch_size=64, + max_tile_batch_size=32, reprojection_threads=8, + max_zoom: Optional[int] = None, nodata=None, nodata_as_zero=False, check_fill=False, @@ -154,38 +158,72 @@ def create_tile_set( _, _, src_width = ( da.sizes["index"], - da.sizes["latitude"], - da.sizes["longitude"], + da.sizes["latitude"] if "latitude" in da.sizes else da.sizes["y"], + da.sizes["longitude"] if "longitude" in da.sizes else da.sizes["x"], ) - dst_crs = CRS.from_epsg(3857) - - max_level = int(round(math.log2(src_width / 256.0))) + chunk_size: int = 512 pixels_per_tile = 256 - chunk_size = 512 + os.environ["CHECK_WITH_INVERT_PROJ"] = "YES" + + # to calculate the maximum zoom level we find the bounds in the map projection + # and calculate the number of pixels across full x range + left, bottom, right, top = da.rio.transform_bounds( + "EPSG:4326" + ) # left, bottom, right, top or west, south, east, north + ulx, uly = mercantile.xy(left, top) + lrx, lry = mercantile.xy(right, bottom) + ulx_whole, uly_whole = mercantile.xy(*mercantile.ul(mercantile.Tile(0, 0, 0))) + lrx_whole, lry_whole = mercantile.xy(*mercantile.ul(mercantile.Tile(1, 1, 0))) + whole_map_pixels_x = src_width * (lrx_whole - ulx_whole) / (lrx - ulx) + max_zoom = int(round(math.log2(whole_map_pixels_x / pixels_per_tile))) if max_zoom is None else max_zoom + if indices is None: index_slice = slice(-1, None) indices = range(len(return_periods))[index_slice] logger.info(f"Starting map tiles generation for array {source_path}.") - max_dimension = 2**max_level * pixels_per_tile logger.info(f"Source array size is {da.shape} (z, y, x).") logger.info(f"Indices (z) subset to be processed: {indices}.") - logger.info(f"Maximum zoom is level is {max_level}, i.e. of size ({max_dimension}, {max_dimension}) pixels].") + logger.info(f"Maximum zoom is level is {max_zoom}.") + # _coarsen(target, target_path, max_zoom, (left, bottom, right, top), indices) target.remove(target_path) + _create_empty_tile_pyramid(target, target_path, max_zoom, chunk_size, return_periods) - os.environ["CHECK_WITH_INVERT_PROJ"] = "YES" - for level in range(max_level, 0, -1): - logger.info(f"Starting level {level}.") - level_path = posixpath.join(target_path, f"{level}") + # here we reproject and write the maximum zoom level + _write_zoom_level( + da, + target, + target_path, + max_zoom, + indices, + max_tile_batch_size=max_tile_batch_size, + reprojection_threads=reprojection_threads, + nodata=nodata, + nodata_as_zero=nodata_as_zero, + check_fill=check_fill, + ) - tiles = 2**level + # and then progressively coarsen each level and write until we reach level 0" + _coarsen(target, target_path, max_zoom, (left, bottom, right, top), indices) + + +def _create_empty_tile_pyramid( + target: OscZarr, target_path: str, max_zoom: int, chunk_size: int, return_periods: Sequence[float] +): + pixels_per_tile: int = 256 + ulx, uly = mercantile.xy(*mercantile.ul(mercantile.Tile(x=0, y=0, z=0))) + lrx, lry = mercantile.xy(*mercantile.ul(mercantile.Tile(x=1, y=1, z=0))) + dst_crs = CRS.from_epsg(3857) + for zoom in range(max_zoom, 0, -1): + tiles = 2**zoom dst_dim = tiles * pixels_per_tile - ulx, uly = mercantile.xy(*mercantile.ul(mercantile.Tile(x=0, y=0, z=level))) - lrx, lry = mercantile.xy(*mercantile.ul(mercantile.Tile(x=tiles, y=tiles, z=level))) + zoom_level_path = posixpath.join(target_path, f"{zoom}") whole_map_transform = rasterio.transform.from_bounds(ulx, lry, lrx, uly, dst_dim, dst_dim) # i.e. chunks are //.., e.g. flood/4/4.2.3 + # we create an array for the whole world, but if the map covers just a fraction then all other + # chunks will be empty _ = target.create_empty( - level_path, + zoom_level_path, dst_dim, dst_dim, whole_map_transform, @@ -194,59 +232,173 @@ def create_tile_set( chunks=(1, chunk_size, chunk_size), ) - num_batches = max(1, 2**level // max_tile_batch_size) - tile_batch_size = min(2**level, max_tile_batch_size) - for index in indices: - logger.info(f"Starting index {index}.") - da_index = da[index, :, :] # .compute() - if nodata_as_zero: - da_index.data[np.isnan(da_index.data)] = 0 - if nodata: - da_index.data[da_index.data == nodata] = 0 + +def _write_zoom_level( + da: xr.DataArray, + target: OscZarr, + target_path: str, + zoom: int, + indices: Optional[Sequence[int]] = None, + max_tile_batch_size: int = 32, + reprojection_threads: int = 8, + nodata=None, + nodata_as_zero=False, + check_fill=False, +): + logger.info(f"Starting writing of level {zoom}.") + pixels_per_tile: int = 256 + + left, bottom, right, top = da.rio.transform_bounds("EPSG:4326") + # we find just the tiles that contain the map area + xmin, xmax, ymin, ymax = get_tile_bounds(left, bottom, right, top, zoom) + dst_crs = CRS.from_epsg(3857) + zoom_level_path = posixpath.join(target_path, f"{zoom}") + ntiles_in_level = 2**zoom + ntiles = max(xmax - xmin + 1, ymax - ymin + 1) + num_batches = max(1, ntiles // max_tile_batch_size) + tile_batch_size = min(ntiles, max_tile_batch_size) + for index in indices if indices is not None else [0]: + logger.info(f"Starting index {index}.") + da_index = da[index, :, :] # .compute() + if nodata_as_zero: + da_index.data[np.isnan(da_index.data)] = 0 + if nodata: + da_index.data[da_index.data == nodata] = 0 + for batch_x in range(0, num_batches): + for batch_y in range(0, num_batches): + x_slice = slice( + xmin + batch_x * tile_batch_size, min(xmin + (batch_x + 1) * tile_batch_size, ntiles_in_level) + ) + y_slice = slice( + ymin + batch_y * tile_batch_size, min(ymin + (batch_y + 1) * tile_batch_size, ntiles_in_level) + ) + ulx, uly = mercantile.xy(*mercantile.ul(mercantile.Tile(x=x_slice.start, y=y_slice.start, z=zoom))) + lrx, lry = mercantile.xy(*mercantile.ul(mercantile.Tile(x=x_slice.stop, y=y_slice.stop, z=zoom))) + logger.info(f"Processing batch ({batch_x}/{num_batches}, {batch_y}/{num_batches}).") + dst_transform = rasterio.transform.from_bounds( + ulx, + lry, + lrx, + uly, + (y_slice.stop - y_slice.start) * pixels_per_tile, + (x_slice.stop - x_slice.start) * pixels_per_tile, + ) + # at this point, we could take just the portion of the source + # that covers the batch of tiles + # calculate bounds of tiles in source CRS using rasterio.warp.transform_bounds + da_trimmed = trim_array(da_index, dst_crs, ulx, lry, lrx, uly) + if da_trimmed is None: + # no overlap with batch + continue + + da_trimmed.attrs["nodata"] = float("nan") + + da_m = da_trimmed.rio.reproject( + "EPSG:3857", + resampling=Resampling.bilinear, + shape=( + (y_slice.stop - y_slice.start) * pixels_per_tile, + (x_slice.stop - x_slice.start) * pixels_per_tile, + ), + transform=dst_transform, + num_threads=reprojection_threads, + nodata=nodata, + ) + + logger.info(f"Reprojection complete. Writing to target {zoom_level_path}.") + + if check_fill: + da_m.data = xr.where(da_m.data > 3.4e38, np.nan, da_m.data) + + target.write_slice( + zoom_level_path, + slice(index, index + 1), + slice( + y_slice.start * pixels_per_tile, + y_slice.stop * pixels_per_tile, + ), + slice( + x_slice.start * pixels_per_tile, + x_slice.stop * pixels_per_tile, + ), + da_m.data, + ) + + logger.info("Batch complete.") + + +def get_tile_bounds(left: float, bottom: float, right: float, top: float, zoom: int): + ul = mercantile.tile(left, top, zoom) + lr = mercantile.tile(right, bottom, zoom) + return ul.x, lr.x, ul.y, lr.y + + +def _coarsen( + target: OscZarr, + target_path: str, + max_zoom: int, + bounds: Tuple[float, float, float, float], + indices: Optional[Sequence[int]], + max_tile_batch_size: int = 16, +): + pixels_per_tile = 256 + bbox = rasterio.coords.BoundingBox(*bounds) + for index in indices if indices is not None else [0]: + for zoom in range(max_zoom, 1, -1): + current_zoom_level_path = posixpath.join(target_path, f"{zoom}") + next_zoom_level_path = posixpath.join(target_path, f"{zoom - 1}") + z = target.read_zarr(current_zoom_level_path) + # we go down a zoom level to find the bounds + xmin, xmax, ymin, ymax = get_tile_bounds(bbox.left, bbox.bottom, bbox.right, bbox.top, zoom - 1) + xmin, ymin = xmin * 2, ymin * 2 + xmax, ymax = xmax * 2 + 1, ymax * 2 + 1 + ntiles_in_level = 2**zoom + ntiles = max(xmax - xmin + 1, ymax - ymin + 1) + num_batches = max(1, ntiles // max_tile_batch_size) + tile_batch_size = min(ntiles, max_tile_batch_size) + # in this case we can process the batches in parallel: consider multi-threading this part? for batch_x in range(0, num_batches): for batch_y in range(0, num_batches): - x_slice = slice(batch_x * tile_batch_size, (batch_x + 1) * tile_batch_size) - y_slice = slice(batch_y * tile_batch_size, (batch_y + 1) * tile_batch_size) - ulx, uly = mercantile.xy(*mercantile.ul(mercantile.Tile(x=x_slice.start, y=y_slice.start, z=level))) - lrx, lry = mercantile.xy(*mercantile.ul(mercantile.Tile(x=x_slice.stop, y=y_slice.stop, z=level))) - logger.info(f"Processing batch ({batch_x}/{num_batches}, {batch_y}/{num_batches}).") - dst_transform = rasterio.transform.from_bounds( - ulx, - lry, - lrx, - uly, - pixels_per_tile * tile_batch_size, - pixels_per_tile * tile_batch_size, + x_slice = slice( + xmin + batch_x * tile_batch_size, min(xmin + (batch_x + 1) * tile_batch_size, ntiles_in_level) ) - # if necessary per batch we could operate on just a chunk of da - da_m = da_index.rio.reproject( - "EPSG:3857", - resampling=Resampling.bilinear, - shape=( - pixels_per_tile * tile_batch_size, - pixels_per_tile * tile_batch_size, - ), - transform=dst_transform, - num_threads=reprojection_threads, - nodata=nodata, + y_slice = slice( + ymin + batch_y * tile_batch_size, min(ymin + (batch_y + 1) * tile_batch_size, ntiles_in_level) ) - logger.info(f"Reprojection complete. Writing to target {level_path}.") - - if check_fill: - da_m.data = xr.where(da_m.data > 3.4e38, np.nan, da_m.data) + zslice = z[ + index, + y_slice.start * pixels_per_tile : y_slice.stop * pixels_per_tile, + x_slice.start * pixels_per_tile : x_slice.stop * pixels_per_tile, + ] + # use Zarr array directly, as opposed to e.g.: + # da_slice = da_slice.coarsen(x=2, y=2).mean() # type:ignore + zslice = (zslice[::2, ::2] + zslice[1::2, ::2] + zslice[::2, 1::2] + zslice[1::2, 1::2]) / 4 target.write_slice( - level_path, + next_zoom_level_path, slice(index, index + 1), slice( - y_slice.start * pixels_per_tile, - y_slice.stop * pixels_per_tile, + int(y_slice.start * pixels_per_tile / 2), + int(y_slice.stop * pixels_per_tile / 2), ), slice( - x_slice.start * pixels_per_tile, - x_slice.stop * pixels_per_tile, + int(x_slice.start * pixels_per_tile / 2), + int(x_slice.stop * pixels_per_tile / 2), ), - da_m.data, + zslice, ) - logger.info("Batch complete.") + +def trim_array(da_index: xr.DataArray, crs, left, bottom, right, top): + da_left, da_bottom, da_right, da_top = rasterio.warp.transform_bounds( + crs, da_index.rio.crs, left, bottom, right, top + ) + # find pixels required in order to add a pixel buffer + x = np.where((da_index.x >= da_left) & (da_index.x <= da_right))[0] + y = np.where((da_index.y >= da_bottom) & (da_index.y <= da_top))[0] + if len(x) == 0 or len(y) == 0: + return None + # add a 2 pixel buffer + xmin, xmax = max(0, x[0] - 2), min(len(da_index.x), x[-1] + 2) + ymin, ymax = max(0, y[0] - 2), min(len(da_index.y), y[-1] + 2) + return da_index[ymin:ymax, xmin:xmax] diff --git a/src/hazard/utilities/xarray_utilities.py b/src/hazard/utilities/xarray_utilities.py index bc87b06..b86fedf 100644 --- a/src/hazard/utilities/xarray_utilities.py +++ b/src/hazard/utilities/xarray_utilities.py @@ -4,6 +4,7 @@ import dask.array import numpy as np import rasterio # type: ignore +import rioxarray # noqa: F401 import xarray as xr import zarr # type: ignore import zarr.core @@ -124,6 +125,7 @@ def data_array_from_zarr(z: zarr.core.Array) -> xr.DataArray: da.rio.write_crs(4326, inplace=True) da = da.rename({"dim_0": "index", "dim_1": "latitude", "dim_2": "longitude"}) else: + da.rio.write_crs(crs, inplace=True) da = da.rename({"dim_0": "index", "dim_1": "y", "dim_2": "x"}) return da diff --git a/src/inventories/hazard/inventory.json b/src/inventories/hazard/inventory.json index 120e031..ae4b078 100644 --- a/src/inventories/hazard/inventory.json +++ b/src/inventories/hazard/inventory.json @@ -2765,6 +2765,56 @@ } ], "units": "months/year" + }, + { + "hazard_type": "RiverineInundation", + "group_id": "", + "path": "inundation/river_tudelft/v2/flood_depth_{scenario}_{year}", + "indicator_id": "flood_depth", + "indicator_model_id": "tudelft", + "indicator_model_gcm": "CLMcom-CCLM4-8-17-EC-EARTH", + "params": {}, + "display_name": "Flood depth (TUDelft)", + "display_groups": [], + "description": "\nFlood water depth, part of data set containing data related to the probability of\nriver floods occurring in Europe under present and future climate.\nBased upon CLMcom-CCLM4-8-17-EC-EARTH regional\nclimate simulation (EURO-CORDEX).\n ", + "map": { + "colormap": { + "min_index": 1, + "min_value": 0.0, + "max_index": 255, + "max_value": 5.0, + "name": "flare", + "nodata_index": 0, + "units": "metres" + }, + "path": "maps/inundation/river_tudelft/v2/flood_depth_{scenario}_{year}_map", + "bounds": [], + "index_values": null, + "source": "map_array_pyramid" + }, + "scenarios": [ + { + "id": "historical", + "years": [ + 1971 + ] + }, + { + "id": "rcp4p5", + "years": [ + 2035, + 2085 + ] + }, + { + "id": "rcp8p5", + "years": [ + 2035, + 2085 + ] + } + ], + "units": "metres" } ] } \ No newline at end of file diff --git a/tests/inventory_test.py b/tests/inventory_test.py index 12609a1..0eb74f1 100644 --- a/tests/inventory_test.py +++ b/tests/inventory_test.py @@ -12,6 +12,7 @@ from hazard.models.work_loss import WorkLossIndicator from hazard.onboard.iris_wind import IRISIndicator from hazard.onboard.jupiter import Jupiter +from hazard.onboard.tudelft_flood import TUDelftRiverFlood from hazard.onboard.wri_aqueduct_flood import WRIAqueductFlood from hazard.onboard.wri_aqueduct_water_risk import WRIAqueductWaterRisk from hazard.utilities import zarr_utilities # type: ignore @@ -45,6 +46,7 @@ def test_create_inventory(test_output_dir): # noqa: F811 WetBulbGlobeTemperatureAboveIndicator(), WRIAqueductWaterRisk(), DroughtIndicator(None), + TUDelftRiverFlood(None), ] docs_store.write_new_empty_inventory() diff --git a/tests/onboarding_test.py b/tests/onboarding_test.py index 1d431af..a927f61 100644 --- a/tests/onboarding_test.py +++ b/tests/onboarding_test.py @@ -1,5 +1,6 @@ import logging import os +import pathlib import sys from pathlib import PurePosixPath @@ -7,6 +8,7 @@ import pytest import s3fs import zarr +import zarr.convenience from hazard.docs_store import DocStore from hazard.models.water_temp import FutureStreamsSource, WaterTemperatureAboveIndicator @@ -26,6 +28,7 @@ from hazard.sources.osc_zarr import OscZarr from hazard.sources.wri_aqueduct import WRIAqueductSource from hazard.utilities import s3_utilities, zarr_utilities +from hazard.utilities.tiles import create_tile_set @pytest.fixture @@ -160,12 +163,24 @@ def test_onboard_tudelft(s3_credentials, test_output_dir): source_path = os.path.join(test_output_dir, "tudelft", "tudelft_river") model = TUDelftRiverFlood(source_path) model.prepare() - - batch_items = model.batch_items() store = zarr.DirectoryStore(os.path.join(test_output_dir, "hazard", "hazard.zarr")) target = OscZarr(store=store) - model.run_single(batch_items[0], None, target, None) - # model.create_maps(target, target) + model.run_all(None, target) + # batch_items = model.batch_items() + # model.run_single(batch_items[4], None, target, None) + model.create_maps(target, target) + # path = "inundation/river_tudelft/v2/flood_depth_historical_1971" + # map_path = "maps/inundation/river_tudelft/v2/flood_depth_historical_1971_map" + # create_tile_set(target, path, target, map_path, max_zoom=10) + for i in range(10, 0, -1): + s3_utilities.copy_local_to_dev( + str(pathlib.Path(test_output_dir, "hazard/hazard.zarr")), + f"maps/inundation/river_tudelft/v2/flood_depth_historical_1971_map/{i}", + ) + s3_utilities.copy_local_to_dev( + str(pathlib.Path(test_output_dir, "hazard/hazard.zarr")), + "inundation/river_tudelft/v2/flood_depth_historical_1971", + ) @pytest.mark.skip(reason="on-boarding script") diff --git a/tests/tile_creation_test.py b/tests/tile_creation_test.py index 932efd4..d4f11f6 100644 --- a/tests/tile_creation_test.py +++ b/tests/tile_creation_test.py @@ -5,14 +5,18 @@ import dask import numpy as np import pytest +import rasterio.transform +import rasterio.warp import xarray as xr import zarr.core # type: ignore +from rasterio.warp import Resampling from hazard.indicator_model import IndicatorModel from hazard.models.days_tas_above import DaysTasAboveIndicator from hazard.models.degree_days import DegreeDays from hazard.models.work_loss import WorkLossIndicator from hazard.onboard.jupiter import Jupiter +from hazard.onboard.tudelft_flood import TUDelftRiverFlood from hazard.onboard.wri_aqueduct_flood import WRIAqueductFlood from hazard.sources.osc_zarr import OscZarr from hazard.utilities import zarr_utilities @@ -21,41 +25,73 @@ from .conftest import test_output_dir # noqa: F401 -@pytest.mark.skip(reason="Example not test") -def test_xarray_writing(test_output_dir): # noqa: F811 - # lat = np.arange(90, -90, -0.01) - # lon = np.arange(-180, 180, 0.01) +def test_convert_tiles_mocked(test_output_dir): + """We are combining useful logic from a few sources. + rio_tiler and titiler are very useful and also: + https://github.com/mapbox/rio-mbtiles + https://github.com/carbonplan/ndpyramid/ + (but we need slippy maps) + https://github.com/DHI-GRAS/terracotta + we need to align to: + https://wiki.openstreetmap.org/wiki/Slippy_map_tilenames. + """ - # x = xr.Dataset( - # coords={ - # "lat": (["lat"], lat), - # "lon": (["lon"], lon), - # }, - # data_vars={ - # "dsm": ( - # ["lat", "lon"], - # dask.array.empty((lat.size, lon.size), chunks=(1024, 1024), dtype="uint8"), - # ) - # }, - # ) - store = zarr.DirectoryStore(os.path.join(test_output_dir, "hazard_test/test")) - y = xr.open_zarr(store) - y.dsm[0:256, 0:256] = np.random.randn(256, 256) - y.to_zarr() + # create rectangular are in EPSG:3035 and check the projection and + # downscaling of this + # bounds of the rectangular region in latitude / longitude + feat_left, feat_bottom, feat_right, feat_top = 5, 51, 6, 52 + left, bottom, right, top = -10, 40, 30, 55 + step = 0.01 + lon = np.linspace(left + step / 2, right - step / 2, num=int((right - left) / 0.01)) + lat = np.linspace(bottom + step / 2, top - step / 2, num=int((top - bottom) / 0.01)) + da = xr.DataArray( + data=np.zeros((3, len(lat), len(lon))), + coords=dict(index=np.array([0, 1, 2]), y=lat[::-1], x=lon), + attrs=dict(description="Test array"), + ) + np.testing.assert_array_equal(da.rio.bounds(), [-10, 40, 30, 55]) + da.sel(index=2, x=slice(feat_left, feat_right), y=slice(feat_top, feat_bottom))[:, :] = 1.0 + da.rio.write_crs(4326, inplace=True) + t_left, t_bottom, t_right, t_top = rasterio.warp.transform_bounds( + "EPSG:4236", "EPSG:3035", left, bottom, right, top + ) + dst_transform = rasterio.transform.from_bounds(t_left, t_bottom, t_right, t_top, len(lon), len(lat)) + da_3035 = da.rio.reproject( + "EPSG:3035", + shape=(len(lat), len(lon)), + transform=dst_transform, + resampling=Resampling.nearest, + nodata=float("nan"), + ) + da_3035.assign_coords(index=("index", [0, 1, 2])) + assert da_3035[2, :, :].max().values == 1 + local_store = zarr.DirectoryStore(os.path.join(test_output_dir, "hazard", "hazard.zarr")) + source = OscZarr(store=local_store) + source.write("test_set/test0", da_3035) + create_tile_set(source, "test_set/test0", source, "test_set/test0_map", indices=[2]) + da_map = source.read("test_set/test0_map/2") + # find the location that should contain the region + m_left, m_bottom, m_right, m_top = rasterio.warp.transform_bounds( + "EPSG:4236", "EPSG:3857", feat_left, feat_bottom, feat_right, feat_top + ) + mapped_region = da_map.sel(index=2, x=slice(m_left, m_right), y=slice(m_top, m_bottom)) + # this should contain our original feature + assert mapped_region.max().values == 1 @pytest.mark.skip(reason="Example not test") def test_map_tiles_from_model(test_output_dir): # noqa: F811 - local_store = zarr.DirectoryStore(os.path.join(test_output_dir, "hazard_test", "hazard.zarr")) + local_store = zarr.DirectoryStore(os.path.join(test_output_dir, "hazard", "hazard.zarr")) source = OscZarr(store=local_store) target = source models: List[IndicatorModel] = [ - WRIAqueductFlood(), - DegreeDays(), - Jupiter(), - WorkLossIndicator(), - DaysTasAboveIndicator(), + TUDelftRiverFlood(None), + # WRIAqueductFlood(), + # DegreeDays(), + # Jupiter(), + # WorkLossIndicator(), + # DaysTasAboveIndicator(), ] for model in models: resources = model.inventory() @@ -68,30 +104,20 @@ def test_map_tiles_from_model(test_output_dir): # noqa: F811 @pytest.mark.skip(reason="Requires mocking") def test_convert_tiles(test_output_dir): # noqa: F811 - """We are combining useful logic from a few sources. - rio_tiler and titiler are very useful and also: - https://github.com/mapbox/rio-mbtiles - https://github.com/carbonplan/ndpyramid/ - (but we need slippy maps) - https://github.com/DHI-GRAS/terracotta - we need to align to: - https://wiki.openstreetmap.org/wiki/Slippy_map_tilenames. - """ - zarr_utilities.set_credential_env_variables() id = "00000NorESM1-M" scenario = "rcp8p5" year = 2050 path = f"inundation/wri/v2/inunriver_{scenario}_{id}_{year}" map_path = f"inundation/wri/v2/inunriver_{scenario}_{id}_{year}_map" - - copy_zarr_local(test_output_dir, path) - - local_store = zarr.DirectoryStore(os.path.join(test_output_dir, "hazard_test", "hazard.zarr")) + # copy_zarr_local(test_output_dir, path) + local_store = zarr.DirectoryStore(os.path.join(test_output_dir, "hazard", "hazard.zarr")) source = OscZarr(store=local_store) target = source - create_tile_set(source, path, target, map_path) + path = "inundation/river_tudelft/v2/flood_depth_historical_1971" + map_path = "inundation/river_tudelft/v2/flood_depth_historical_1971_map" + create_tile_set(source, path, target, map_path, max_zoom=10) def copy_zarr_local(test_output_dir, path): # noqa: F811