Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bccaqv2 split bbox and grid point #60

Merged
merged 33 commits into from
Jan 16, 2020
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
805f544
add start_date and end_date to bccaqv2 subset
Dec 19, 2019
8ef1ffa
split bccaqv2 bbox and grid point processes
Dec 19, 2019
96646f9
deprecate lon0 and lat0 for ...
Dec 20, 2019
614260e
add test for bccaqv2 boundingbox subset and ...
Dec 20, 2019
2ae93bc
add exception message when request fails in tests
Jan 6, 2020
f817ea4
read tests output in memory instead of writing to disk
Jan 6, 2020
e5253ef
change the point subset to accept a comma ...
Jan 6, 2020
107e6b5
add test for deprecation of lat0 and lon0
Jan 6, 2020
5d39de2
extract monkeypatching in a single fixture
Jan 6, 2020
81fe8e0
pin pywps~=4.2.3
Jan 6, 2020
5aa7c03
subset multiple grid cells
Jan 6, 2020
58b99e2
keep global_attributes when merging datasets
Jan 7, 2020
446f43f
fix dimensions for multiple grid points
Jan 10, 2020
1e3e91b
fix and test csv conversions
Jan 10, 2020
b5156d3
skip online heat_wave test
Jan 10, 2020
a924510
flake8
Jan 10, 2020
8acb667
typo and documentation
Jan 10, 2020
253a4f0
speed up csv creation
Jan 13, 2020
9071d14
docs
Jan 13, 2020
623bb59
explicitely close thread pool
Jan 13, 2020
63a132e
fix csv output dropna
Jan 13, 2020
85f531a
flake8
Jan 13, 2020
6ba2c34
fix multiple grid cells output and csv output
Jan 14, 2020
7cf9332
formatting (black) and fix tests assertions
Jan 14, 2020
5c1aacc
drop na values after dropping 'region' index
Jan 14, 2020
66c4a0e
bump xsubsetpoint version number
Jan 14, 2020
affab20
extract common inputs to wpsio.py
Jan 14, 2020
4980409
clarify unit test
Jan 14, 2020
3780ef7
heatwave inherits from point subset, so the input...
Jan 14, 2020
e00401c
remove unneeded optimization in heatwave process
Jan 14, 2020
4f7df80
extract lat and lon to wpsio.py
Jan 15, 2020
73617f3
flake8
Jan 15, 2020
ae632b5
Merge branch 'master' into bccaqv2-split-bbox-and-grid-point
Jan 16, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions environment.yml
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
name: finch
channels:
- birdhouse
- conda-forge
- defaults
- birdhouse
- conda-forge
- defaults
dependencies:
- python>=3.6
- pip
- jinja2
- click
- psutil
- bottleneck
- netcdf4
- libnetcdf==4.6.2
- numpy
- unidecode
- dask
- xarray>=0.12
- scipy
- sentry-sdk
- siphon
- xclim>=0.12.2
- pywps>=4.2.3

- python>=3.6
- pip
- jinja2
- click
- psutil
- bottleneck
- netcdf4
- libnetcdf==4.6.2
- numpy
- unidecode
- dask
- xarray>=0.12
- scipy
- sentry-sdk
- siphon
- xclim>=0.12.2
- pywps~=4.2.3
6 changes: 4 additions & 2 deletions finch/processes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .wps_xsubsetbbox import SubsetBboxProcess
from .wps_xsubsetpoint import SubsetGridPointProcess
from .wps_xsubset_bccaqv2 import SubsetBCCAQV2Process
from .wps_xsubsetpoint_bccaqv2 import SubsetGridPointBCCAQV2Process
from .wps_xsubsetbbox_bccaqv2 import SubsetBboxBCCAQV2Process
from .wps_xclim_indices import make_xclim_indicator_process
from .wps_bccaqv2_heatwave import BCCAQV2HeatWave
import xclim
Expand Down Expand Up @@ -29,7 +30,8 @@ def get_indicators(*args):
[
SubsetBboxProcess(),
SubsetGridPointProcess(),
SubsetBCCAQV2Process(),
SubsetGridPointBCCAQV2Process(),
SubsetBboxBCCAQV2Process(),
BCCAQV2HeatWave(),
]
)
Expand Down
2 changes: 2 additions & 0 deletions finch/processes/subset.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def process_resource(resource):
if threads > 1:
pool = ThreadPool(processes=threads)
list(pool.imap_unordered(process_resource, resources))
pool.close()
pool.join()
else:
for r in resources:
process_resource(r)
Expand Down
33 changes: 26 additions & 7 deletions finch/processes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import List, Tuple
from enum import Enum

import numpy as np
import pandas as pd
import xarray as xr
import requests
Expand All @@ -31,7 +32,9 @@ def is_opendap_url(url):
So we need to let the netCDF4 library actually open the file.
"""
try:
content_description = requests.head(url, timeout=5).headers.get("Content-Description")
content_description = requests.head(url, timeout=5).headers.get(
"Content-Description"
)
except (ConnectionError, MissingSchema, InvalidSchema):
return False

Expand All @@ -42,7 +45,7 @@ def is_opendap_url(url):
dataset = netCDF4.Dataset(url)
except OSError:
return False
return dataset.disk_format in ('DAP2', 'DAP4')
return dataset.disk_format in ("DAP2", "DAP4")


class ParsingMethod(Enum):
Expand Down Expand Up @@ -241,11 +244,17 @@ def get_attrs_fallback(ds, *args):

ds = ds.rename({variable: output_variable})

df = ds.to_dataframe()[["lat", "lon", output_variable]]
# most runs have timestamp with hour == 12 a few hour == 0 .. make uniform
df.index = df.index.map(lambda x: x.replace(hour=12))
# most runs have timestamp with hour == 12 a few hour == 0 ... make uniform
if not np.all(ds.time.dt.hour == 12):
attrs = ds.time.attrs
ds["time"] = [y.replace(hour=12) for y in ds.time.values]
ds.time.attrs = attrs

df = ds.to_dataframe()

if calendar not in concat_by_calendar:
if "lat" in df.index.names and "lon" in df.index.names:
df = df.reset_index(["lat", "lon"])
concat_by_calendar[calendar] = [df]
else:
concat_by_calendar[calendar].append(df[output_variable])
Expand All @@ -255,7 +264,17 @@ def get_attrs_fallback(ds, *args):
output_csv_list = []
for calendar_type, data in concat_by_calendar.items():
output_csv = output_folder / f"{filename_prefix}_{calendar_type}.csv"
pd.concat(data, axis=1).to_csv(output_csv)
concat = pd.concat(data, axis=1)

try:
concat = concat.reset_index().set_index("time").drop(columns="region")
except KeyError:
pass

dropna_threshold = 3 # lat + lon + at least one value
concat.dropna(thresh=dropna_threshold, inplace=True)

concat.to_csv(output_csv)
output_csv_list.append(output_csv)

metadata_folder = output_folder / "metadata"
Expand All @@ -264,7 +283,7 @@ def get_attrs_fallback(ds, *args):
metadata_file = metadata_folder / f"{output_variable}.csv"
metadata_file.write_text(info)

return output_csv_list, metadata_folder
return output_csv_list, str(metadata_folder)


def format_metadata(ds) -> str:
Expand Down
79 changes: 28 additions & 51 deletions finch/processes/wps_bccaqv2_heatwave.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import xarray as xr
from xclim.checks import assert_daily
from xclim.atmos import heat_wave_frequency
import xclim.run_length
from pathlib import Path
from pywps.response.execute import ExecuteResponse
from pywps.app.exceptions import ProcessError
Expand Down Expand Up @@ -38,13 +37,13 @@ def __init__(self):
"lon",
"Longitude of point",
abstract="Longitude located inside the grid-cell to extract.",
data_type="float",
data_type="string",
davidcaron marked this conversation as resolved.
Show resolved Hide resolved
),
LiteralInput(
"lat",
"Latitude of point",
abstract="Latitude located inside the grid-cell to extract.",
data_type="float",
data_type="string",
),
LiteralInput(
"y0",
Expand Down Expand Up @@ -157,41 +156,32 @@ def _handler(self, request: WPSRequest, response: ExecuteResponse):
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)

# monkeypatch windowed_run_events with a faster version
old_windowed_run_events = xclim.run_length.windowed_run_events
xclim.run_length.windowed_run_events = rolling_window_events

try:
for n, (tasmin, tasmax) in enumerate(pairs):
percentage = start_percentage + int(
n / n_pairs * (end_percentage - start_percentage)
)
self.write_log(
f"Computing indices for file {n + 1} of {n_pairs}",
response,
percentage,
)

tasmin, tasmax = fix_broken_time_indices(tasmin, tasmax)

compute_inputs = [i.identifier for i in self.indices_process.inputs]
inputs = {
k: v for k, v in request.inputs.items() if k in compute_inputs
}

inputs["tasmin"] = deque([make_nc_input("tasmin")], maxlen=1)
inputs["tasmin"][0].file = str(tasmin)
inputs["tasmax"] = deque([make_nc_input("tasmax")], maxlen=1)
inputs["tasmax"][0].file = str(tasmax)

out = self.compute_indices(self.indices_process.xci, inputs)
out_fn = Path(self.workdir) / tasmin.name.replace(
"tasmin", "heat_wave_frequency"
)
out.to_netcdf(out_fn)
output_files.append(out_fn)
finally:
xclim.run_length.windowed_run_events = old_windowed_run_events
for n, (tasmin, tasmax) in enumerate(pairs):
percentage = start_percentage + int(
n / n_pairs * (end_percentage - start_percentage)
)
self.write_log(
f"Computing indices for file {n + 1} of {n_pairs}",
response,
percentage,
)

tasmin, tasmax = fix_broken_time_indices(tasmin, tasmax)

compute_inputs = [i.identifier for i in self.indices_process.inputs]
inputs = {k: v for k, v in request.inputs.items() if k in compute_inputs}

inputs["tasmin"] = deque([make_nc_input("tasmin")], maxlen=1)
inputs["tasmin"][0].file = str(tasmin)
inputs["tasmax"] = deque([make_nc_input("tasmax")], maxlen=1)
inputs["tasmax"][0].file = str(tasmax)

out = self.compute_indices(self.indices_process.xci, inputs)
out_fn = Path(self.workdir) / tasmin.name.replace(
"tasmin", "heat_wave_frequency"
)
out.to_netcdf(out_fn)
output_files.append(out_fn)

warnings.filterwarnings("default", category=FutureWarning)
warnings.filterwarnings("default", category=UserWarning)
Expand All @@ -216,19 +206,6 @@ def log(message_, percentage_):
return response


def rolling_window_events(da, window, dim="time"):
window_count = da.rolling(time=window).sum()
w = window_count.values[window - 1 :] >= window

count = np.count_nonzero(w[1:] > w[:-1]) + w[0]

data = np.array([count], dtype=np.int64)
data = data.reshape(())
out = xr.DataArray(data, coords={"lon": da.lon, "lat": da.lat})

return out


def fix_broken_time_indices(tasmin: Path, tasmax: Path) -> Tuple[Path, Path]:
"""In a single bccaqv2 dataset, there is an error in the timestamp data.

Expand Down
40 changes: 5 additions & 35 deletions finch/processes/wps_xsubsetbbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pywps.app.exceptions import ProcessError
from pywps.inout.outputs import MetaLink4
from xclim.subset import subset_bbox, subset_gridpoint
from .wpsio import start_date, end_date
from .wpsio import start_date, end_date, lat0, lat1, lon0, lon1

from finch.processes.subset import SubsetProcess

Expand All @@ -25,38 +25,10 @@ def __init__(self):
max_occurs=1000,
supported_formats=[FORMATS.NETCDF, FORMATS.DODS],
),
LiteralInput(
"lon0",
"Minimum longitude",
abstract="Minimum longitude.",
data_type="float",
default=0,
min_occurs=0,
),
LiteralInput(
"lon1",
"Maximum longitude",
abstract="Maximum longitude.",
data_type="float",
default=360,
min_occurs=0,
),
LiteralInput(
"lat0",
"Minimum latitude",
abstract="Minimum latitude.",
data_type="float",
default=-90,
min_occurs=0,
),
LiteralInput(
"lat1",
"Maximum latitude",
abstract="Maximum latitude.",
data_type="float",
default=90,
min_occurs=0,
),
lon0,
lon1,
lat0,
lat1,
start_date,
end_date,
LiteralInput(
Expand Down Expand Up @@ -111,8 +83,6 @@ def subset(
lat0 = wps_inputs["lat0"][0].data
lon1 = self.get_input_or_none(wps_inputs, "lon1")
lat1 = self.get_input_or_none(wps_inputs, "lat1")
# dt0 = wps_inputs['dt0'][0].data or None
# dt1 = wps_inputs['dt1'][0].data or None
start = self.get_input_or_none(wps_inputs, "start_date")
end = self.get_input_or_none(wps_inputs, "end_date")
variables = [r.data for r in wps_inputs.get("variable", [])]
Expand Down
Loading