Skip to content

Commit

Permalink
Fast generation of map tiles where map covers only part of globe (#54)
Browse files Browse the repository at this point in the history
* Working version

---------

Signed-off-by: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com>
  • Loading branch information
joemoorhouse authored Apr 8, 2024
1 parent 49eddd8 commit 02209eb
Show file tree
Hide file tree
Showing 10 changed files with 467 additions and 143 deletions.
13 changes: 8 additions & 5 deletions src/hazard/onboard/ethz_litpop.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def prepare(self, working_dir: Optional[str] = None):
if not isinstance(self.fs, LocalFileSystem):
# e.g. we are copying to S3; download to specified working directory,
# but then copy to self.source_dir
assert working_dir is not None
download_and_unzip(self.zip_url, working_dir, self.zip_url.split("/")[-1].split(".")[0])
for file in os.listdir(working_dir):
with open(file, "rb") as f:
Expand Down Expand Up @@ -159,10 +160,11 @@ def _resources(self) -> Dict[str, HazardResource]:
hazard_type="SpatialDistribution",
indicator_id=key,
indicator_model_gcm="",
indicator_model_id=None,
path=path,
params={},
display_name=resource_map[key]["display_name"],
description=resource_map[key]["description"],
display_name=str(resource_map[key]["display_name"]),
description=str(resource_map[key]["description"]),
group_id="",
display_groups=[],
map=MapInfo(
Expand All @@ -178,13 +180,14 @@ def _resources(self) -> Dict[str, HazardResource]:
nodata_index=0,
name="heating",
min_value=0.0,
max_value=resource_map[key]["max_value"],
units=resource_map[key]["units"],
max_value=resource_map[key]["max_value"], # type:ignore
units=resource_map[key]["units"], # type:ignore
),
index_values=None,
path="maps/" + path + "_map",
source="map_array_pyramid",
),
units=resource_map[key]["units"],
units=str(resource_map[key]["units"]),
scenarios=[
Scenario(id="historical", years=[2014]),
],
Expand Down
82 changes: 55 additions & 27 deletions src/hazard/onboard/tudelft_flood.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import PurePosixPath
from typing import Any, Iterable, Optional

import numpy as np
import rioxarray # noqa: F401
import xarray as xr
from dask.distributed import Client
Expand All @@ -24,7 +25,8 @@
class BatchItem:
scenario: str
central_year: int
input_dataset_filename: str
flood_depth_filename: str
extent_protected_filename: str


class TUDelftRiverFlood(IndicatorModel[BatchItem]):
Expand Down Expand Up @@ -72,31 +74,37 @@ def __init__(self, source_dir: str, fs: Optional[AbstractFileSystem] = None):
self._resource = list(self.inventory())[0]

def batch_items(self) -> Iterable[BatchItem]:

return [
BatchItem(
scenario="historical",
central_year=1971,
input_dataset_filename="River_flood_depth_1971_2000_hist_{return_period}.tif",
flood_depth_filename="River_flood_depth_1971_2000_hist_{return_period}.tif",
extent_protected_filename="River_flood_extent_1971_2000_hist_with_protection.tif",
),
BatchItem(
scenario="rcp4p5",
central_year=2035,
input_dataset_filename="River_flood_depth_2021_2050_RCP45_{return_period}.tif",
flood_depth_filename="River_flood_depth_2021_2050_RCP45_{return_period}.tif",
extent_protected_filename="River_flood_extent_2021_2050_RCP45_with_protection.tif",
),
BatchItem(
scenario="rcp8p5",
central_year=2035,
input_dataset_filename="River_flood_depth_2021_2050_RCP85_{return_period}.tif",
flood_depth_filename="River_flood_depth_2021_2050_RCP85_{return_period}.tif",
extent_protected_filename="River_flood_extent_2021_2050_RCP85_with_protection.tif",
),
BatchItem(
scenario="rcp4p5",
central_year=2085,
input_dataset_filename="River_flood_depth_2071_2100_RCP45_{return_period}.tif",
flood_depth_filename="River_flood_depth_2071_2100_RCP45_{return_period}.tif",
extent_protected_filename="River_flood_extent_2071_2100_RCP45_with_protection.tif",
),
BatchItem(
scenario="rcp8p5",
central_year=2085,
input_dataset_filename="River_flood_depth_2071_2100_RCP85_{return_period}.tif",
flood_depth_filename="River_flood_depth_2071_2100_RCP85_{return_period}.tif",
extent_protected_filename="River_flood_extent_2071_2100_RCP85_with_protection.tif",
),
]

Expand All @@ -115,34 +123,54 @@ def prepare(self, working_dir: Optional[str] = None):

def run_single(self, item: BatchItem, source: Any, target: ReadWriteDataArray, client: Client):
assert isinstance(target, OscZarr)
input = PurePosixPath(self.source_dir, item.input_dataset_filename)
full_path_depth_format = PurePosixPath(self.source_dir, item.flood_depth_filename)
full_path_extent = PurePosixPath(self.source_dir, item.extent_protected_filename)
assert target is None or isinstance(target, OscZarr)
shape = [39420, 38374] # y, x not all returns have same size (first one smaller at 38371)
for i, return_period in enumerate(self.return_periods):
filename = str(input).format(return_period=self.return_period_str[return_period])
with self.fs.open(filename, "rb") as f:
da = xr.open_rasterio(f).isel(band=0)
# bounds = da.rio.bounds()
if return_period == self.return_periods[0]:
z = target.create_empty(
self._resource.path.format(scenario=item.scenario, year=item.central_year),
shape[1],
shape[0],
da.rio.transform(),
str(da.crs),
indexes=self.return_periods,
)
values = da.data
no_data = da.attrs["nodatavals"]
values[values == no_data] = float("nan")
z[i, 0 : len(da.y), 0 : len(da.x)] = values[:, :]
full_path_depth = str(full_path_depth_format).format(return_period=self.return_period_str[return_period])
with self.fs.open(full_path_depth, "rb") as fd:
dad = xr.open_rasterio(fd).isel(band=0)
with self.fs.open(full_path_extent, "rb") as fe:
dae = xr.open_rasterio(fe).isel(band=0)
# bounds = da.rio.bounds()
if return_period == self.return_periods[0]:
z = target.create_empty(
self._resource.path.format(scenario=item.scenario, year=item.central_year),
shape[1],
shape[0],
dad.rio.transform(),
str(dad.crs),
indexes=self.return_periods,
)
# dad_nodata = 65535
if (
dad.shape[1] == 38371
): # corrections for various possible errors whereby coordinates are missing for certain files:
dae = dae[:, 0:38371]
if dae.shape[1] == 38375:
dae = dae[:, 0:38374]
if dad.shape[1] == 38375:
dad = dad[:, 0:38374]
if dae.shape[0] == 39385:
dad = dad[35:, :]

# not quite the same coordinates: check if close, within rounding error, and align exactly
assert np.abs(np.array(dae.x) - np.array(dad.x)).max() < 1e-4
assert np.abs(np.array(dae.y) - np.array(dad.y)).max() < 1e-4
dae = dae.assign_coords({"x": dad.x, "y": dad.y})
da_combined = xr.where(dae <= return_period, dad, 0)
values = da_combined.data
depth_no_data = dad.attrs["nodatavals"]
values[values == depth_no_data] = float("nan")
z[i, 0 : len(da_combined.y), 0 : len(da_combined.x)] = values[:, :]

def create_maps(self, source: OscZarr, target: OscZarr):
"""
Create map images.
"""
...
create_tiles_for_resource(source, target, self._resource)
create_tiles_for_resource(source, target, self._resource, max_zoom=10)

def inventory(self) -> Iterable[HazardResource]:
"""Get the (unexpanded) HazardModel(s) that comprise the inventory."""
Expand Down Expand Up @@ -182,8 +210,8 @@ def inventory(self) -> Iterable[HazardResource]:
units="metres",
scenarios=[
Scenario(id="historical", years=[1971]),
Scenario(id="rcp45", years=[2050, 2070]),
Scenario(id="rcp85", years=[2050, 2070]),
Scenario(id="rcp4p5", years=[2035, 2085]),
Scenario(id="rcp8p5", years=[2035, 2085]),
],
)
]
2 changes: 2 additions & 0 deletions src/hazard/sources/osc_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ def _zarr_create(
chunks=chunks,
dtype="f4",
overwrite=overwrite,
write_empty_chunks=False,
fill_value=float("nan"),
) # array_path interpreted as path within group
if isinstance(indexes, np.ndarray) and indexes.dtype in [
"int16",
Expand Down
44 changes: 44 additions & 0 deletions src/hazard/utilities/s3_utilities.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,56 @@
import logging
import os
import pathlib
import sys
from typing import Callable, Optional, Sequence

import boto3

logger = logging.getLogger(__name__)


def copy_local_to_dev(zarr_dir: str, array_path: str, dry_run=False):
"""Copy zarr array from a local directory to the development bucket.
Requires environment variables:
OSC_S3_BUCKET_DEV=physrisk-hazard-indicators-dev01
OSC_S3_ACCESS_KEY_DEV=...
OSC_S3_SECRET_KEY_DEV=...
Args:
zarr_dir (str): Directory of the Zarr group, i.e. /<path>/hazard/hazard.zarr.
array_path (str): The path of the array within the group.
dry_run (bool, optional): If True, log the action that would be taken without actually executing. Defaults to False.
"""

logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(filename="batch.log"),
logging.StreamHandler(sys.stdout),
],
)

s3_target_client = boto3.client(
"s3",
aws_access_key_id=os.environ["OSC_S3_ACCESS_KEY_DEV"],
aws_secret_access_key=os.environ["OSC_S3_SECRET_KEY_DEV"],
)
target_bucket_name = os.environ["OSC_S3_BUCKET_DEV"]
logger.info(f"Source path {zarr_dir}; target bucket {target_bucket_name}")

files = [f for f in pathlib.Path(zarr_dir, array_path).iterdir() if f.is_file()]
logger.info(f"Copying {len(files)} files")
if not dry_run:
for i, file in enumerate(files):
with open(file, "rb") as f:
data = f.read()
target_key = str(pathlib.PurePosixPath("hazard", "hazard.zarr", array_path, file.name))
s3_target_client.put_object(Body=data, Bucket=target_bucket_name, Key=target_key)
if i % 100 == 0:
logger.info(f"Completed {i}/{len(files)}")


def copy_dev_to_prod(prefix: str, dry_run=False):
"""Use this script to copy files with the prefix specified from
dev S3 to prod S3.
Expand Down
Loading

0 comments on commit 02209eb

Please sign in to comment.