Skip to content

Commit

Permalink
feat: add first/last processes (#66)
Browse files Browse the repository at this point in the history
* update pre-commit

* update pre-commit hook

* fix test

* fix first/last

* minor fix

* small fixes to first to make it use pd.isnull

* bump process specs

---------

Co-authored-by: Lukas Weidenholzer <lukas.weidenholzer@eodc.eu>
  • Loading branch information
ValentinaHutter and Lukas Weidenholzer authored Apr 26, 2023
1 parent 39a4af8 commit dd88c1d
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 42 deletions.
42 changes: 40 additions & 2 deletions openeo_processes_dask/process_implementations/arrays.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import itertools
import logging
from typing import Any, Optional

import dask.array as da
import numpy as np
import pandas as pd
import xarray as xr
from numpy.typing import ArrayLike
from xarray.core.duck_array_ops import isnull, notnull

from openeo_processes_dask.process_implementations.cubes.utils import _is_dask_array
from openeo_processes_dask.process_implementations.exceptions import (
Expand All @@ -25,8 +28,8 @@
"array_contains",
"array_find",
"array_labels",
# "first",
# "last",
"first",
"last",
# "order",
# "rearrange",
# "sort",
Expand Down Expand Up @@ -182,3 +185,38 @@ def array_labels(data: ArrayLike) -> ArrayLike:
if len(data.shape) > 1:
raise TooManyDimensions("array_labels is only implemented for 1D arrays.")
return np.arange(len(data))


def first(
data: ArrayLike,
ignore_nodata: Optional[bool] = True,
axis: Optional[str] = None,
):
if len(data) == 0:
return np.nan
if axis is None:
data = data.flatten()
axis = 0
if ignore_nodata:
nan_mask = ~pd.isnull(data) # create mask for valid values (not np.nan)
idx_first = np.argmax(nan_mask, axis=axis)
first_elem = np.take(data, indices=0, axis=axis)

if pd.isnull(np.asarray(first_elem)).any():
for i in range(np.max(idx_first) + 1):
first_elem = np.nan_to_num(first_elem, True, np.take(data, i, axis))
else: # take the first element, no matter np.nan values are in the array
first_elem = np.take(data, indices=0, axis=axis)
return first_elem


def last(
data: ArrayLike,
ignore_nodata: Optional[bool] = True,
axis: Optional[int] = None,
):
if len(data) == 0:
return np.nan
data = np.flip(data, axis=axis) # flip data to retrieve the first valid element
last_elem = first(data, ignore_nodata=ignore_nodata, axis=axis)
return last_elem
2 changes: 1 addition & 1 deletion openeo_processes_dask/specs/openeo-processes
Submodule openeo-processes updated 2 files
+0 −0 first.json
+0 −0 last.json
16 changes: 9 additions & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def temporal_interval(interval=["2018-05-01", "2018-06-01"]) -> TemporalInterval

@pytest.fixture
def process_registry() -> ProcessRegistry:
registry = ProcessRegistry(wrap_funcs=[process])

standard_processes = [
func
for _, func in inspect.getmembers(
Expand All @@ -64,16 +66,16 @@ def process_registry() -> ProcessRegistry:
]

specs_module = importlib.import_module("openeo_processes_dask.specs")
specs = {
func.__name__: getattr(specs_module, func.__name__)
for func in standard_processes
}

registry = ProcessRegistry(wrap_funcs=[process])

specs = {}
for func in standard_processes:
if hasattr(specs_module, func.__name__):
specs[func.__name__] = getattr(specs_module, func.__name__)
else:
logger.warning("Process {} does not have a spec.")
registry[func.__name__] = Process(
spec=specs[func.__name__], implementation=func
spec=specs[func.__name__] if func.__name__ in specs else None,
implementation=func,
)

return registry
Expand Down
92 changes: 60 additions & 32 deletions tests/test_arrays.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dask
import dask.array as da
import numpy as np
import pandas as pd
import pytest
import xarray as xr
from openeo_pg_parser_networkx.pg_schema import ParameterReference
Expand Down Expand Up @@ -204,6 +205,49 @@ def test_array_labels():
array_labels(np.array([[1, 0, 3, 2], [5, 0, 6, 4]]))


def test_first():
assert first(np.array([1, 0, 3, 2])) == 1
assert pd.isnull(first(np.array([np.nan, 2, 3]), ignore_nodata=False))
assert first(np.array([np.nan, 2, 3]), ignore_nodata=True) == 2
assert pd.isnull(first([]))


def test_first_along_axis():
multi_axis_array = np.array([[1, 0, 3, 2], [np.nan, 6, 7, 9]])
expected_result_0_true = np.array([1, 0, 3, 2])
expected_result_1_true = np.array([1, 6])
expected_result_0_false = np.array([1, 0, 3, 2])
expected_result_1_false = np.array([1, np.nan])

assert np.array_equal(
first(multi_axis_array, ignore_nodata=True, axis=0),
expected_result_0_true,
equal_nan=True,
)
assert np.array_equal(
first(multi_axis_array, ignore_nodata=True, axis=1),
expected_result_1_true,
equal_nan=True,
)
assert np.array_equal(
first(multi_axis_array, ignore_nodata=False, axis=0),
expected_result_0_false,
equal_nan=True,
)
assert np.array_equal(
first(multi_axis_array, ignore_nodata=False, axis=1),
expected_result_1_false,
equal_nan=True,
)


def test_last():
assert last([1, 0, 3, 2]) == 2
assert pd.isnull(last([0, 1, np.nan], ignore_nodata=False))
assert last([0, 1, np.nan], ignore_nodata=True) == 1
assert pd.isnull(last([]))


@pytest.mark.parametrize("size", [(3, 3, 2, 4)])
@pytest.mark.parametrize("dtype", [np.float32])
def test_reduce_dimension(
Expand Down Expand Up @@ -234,35 +278,19 @@ def test_reduce_dimension(
assert output_cube.dims == ("x", "y", "t")
xr.testing.assert_equal(output_cube, xr.zeros_like(output_cube))

# _process = partial(
# process_registry["first"].implementation,
# data=ParameterReference(from_parameter="data"),
# ignore_nodata=True,
# )
# input_cube[0, :, :, :2] = np.nan
# input_cube[0, :, :, 2] = 1
# output_cube = reduce_dimension(data=input_cube, reducer=_process, dimension="bands")
# general_output_checks(
# input_cube=input_cube,
# output_cube=output_cube,
# verify_attrs=False,
# verify_crs=True,
# )
# assert output_cube.dims == ("x", "y", "t")
# xr.testing.assert_equal(output_cube, xr.ones_like(output_cube))

# _process = partial(
# process_registry["last"].implementation,
# data=ParameterReference(from_parameter="data"),
# ignore_nodata=True,
# )
# input_cube[:, :, :, -1] = 0
# output_cube = reduce_dimension(data=input_cube, reducer=_process, dimension="bands")
# general_output_checks(
# input_cube=input_cube,
# output_cube=output_cube,
# verify_attrs=False,
# verify_crs=True,
# )
# assert output_cube.dims == ("x", "y", "t")
# xr.testing.assert_equal(output_cube, xr.zeros_like(output_cube))
_process = partial(
process_registry["first"].implementation,
data=ParameterReference(from_parameter="data"),
ignore_nodata=True,
)
input_cube[0, :, :, :2] = np.nan
input_cube[0, :, :, 2] = 1
output_cube = reduce_dimension(data=input_cube, reducer=_process, dimension="bands")
general_output_checks(
input_cube=input_cube,
output_cube=output_cube,
verify_attrs=False,
verify_crs=True,
)
assert output_cube.dims == ("x", "y", "t")
xr.testing.assert_equal(output_cube, xr.ones_like(output_cube))

0 comments on commit dd88c1d

Please sign in to comment.