Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add array order processes #65

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion openeo_processes_dask/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def wrapper(
else:
resolved_kwargs[k] = arg

special_args = ["axis", "keepdims"]
special_args = ["axis", "keepdims", "source_transposed_axis"]
# Remove 'axis' and keepdims parameter if not expected in function signature.
for arg in special_args:
if arg not in inspect.signature(f).parameters:
Expand Down
107 changes: 104 additions & 3 deletions openeo_processes_dask/process_implementations/arrays.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
"array_labels",
# "first",
# "last",
# "order",
# "rearrange",
# "sort",
"order",
"rearrange",
"sort",
]


Expand Down Expand Up @@ -182,3 +182,104 @@ def array_labels(data: ArrayLike) -> ArrayLike:
if len(data.shape) > 1:
raise TooManyDimensions("array_labels is only implemented for 1D arrays.")
return np.arange(len(data))


def order(
data: ArrayLike,
asc: Optional[bool] = True,
nodata: Optional[bool] = None,
axis: Optional[int] = None,
):
if isinstance(data, list):
data = np.asarray(data)
if len(data) == 0:
return data

# See https://github.com/dask/dask/issues/4368
logger.warning(
"order: Dask does not support lazy sorting of arrays, therefore the array is loaded into memory here. This might fail for arrays that don't fit into memory."
)

if asc:
permutation_idxs = np.argsort(data, kind="mergesort", axis=axis)
else: # [::-1] not possible
permutation_idxs = np.argsort(
-data, kind="mergesort", axis=axis
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only works with numerical types, but will fail if trying to sort string or datetime arrays!
Why not just use np.flip on the found indices?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had used flip before, but switched to this approach because of the following side effect:
Using the example in the documentation, order(data = [6,-1,2,null,7,4,null,8,3,9,9], asc = false, nodata = false) should result in [3, 6, 9, 10, 7, 4, 0, 5, 8, 2, 1].
With np.flip our result becomes [6, 3, 10, 9, 7, 4, 0, 5, 8, 2, 1], so the values that are the same are in the opposite order. Switching these after flip was used does not seem too convenient to me, what do you think?

Copy link
Contributor

@LukeWeidenwalker LukeWeidenwalker Feb 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks for this - hmm, what we could do would be to count the number of nans before, only flip the array[index_last_nan:] and stitch together again? Then we should have strings/datetimes covered too!

Copy link
Collaborator Author

@ValentinaHutter ValentinaHutter Feb 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what you are referring to would result in an array like [3, 6, 10, 9, 7, 4, 0, 5, 8, 2, 1], so the nan values would be handled. If there are values in the data twice, (like the "9" in the example), the order of these is also flipped. (The result in the example should be [3, 6, 9, 10, 7, 4, 0, 5, 8, 2, 1])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. @m-mohr what do you think about relaxing the Ties will be left in their original ordering requirement in the process spec for order? Is this strictly necessary for some usecase? Seems like implementation on our end would be simpler if this didn't have to be guaranteed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To get comparable and reproducible results, you can't let this undefined.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that's fair, so we cannot drop this requirement completely - what about changing it such that ties will be in the original ordering if asc=True and flipped otherwise?
Otherwise I currently fail to see a way to do this such that arrays of strings or datetimes can also be ordered, we'd have to throw an error in these cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LukeWeidenwalker That's a good point, I think we can clarify that as I think that was the original intention anyway. Please open an issue in openeo-processes and I'll prepare a PR for 2.0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

) # to get the indizes in descending order, the sign of the data is changed

if nodata is None: # ignore np.nan values
if len(data.shape) > 1:
raise ValueError(
"order with nodata=None is not supported for arrays with more than one dimension, as this would result in sparse multi-dimensional arrays."
)
# sort the original data first, to get correct position of no data values
sorted_data = np.take_along_axis(data, permutation_idxs, axis=axis)
return permutation_idxs[~pd.isnull(sorted_data)]
elif nodata is False: # put location/index of np.nan values first
# sort the original data first, to get correct position of no data values
sorted_data = data[permutation_idxs]
return np.append(
permutation_idxs[pd.isnull(sorted_data)],
permutation_idxs[~pd.isnull(sorted_data)],
)
elif nodata is True: # default argsort behaviour, np.nan values are put last
return permutation_idxs


def rearrange(
data: ArrayLike,
order: ArrayLike,
axis: Optional[int] = None,
source_transposed_axis: int = None,
):
if len(data) == 0:
return data
if isinstance(data, list):
data = np.asarray(data)
if isinstance(order, list):
order = np.asarray(order)

if len(data.shape) != len(order.shape):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it make sense to also allow len(order.shape) == 1 here? From the examples in the definition https://processes.openeo.org/#rearrange this could also be valid and I think it makes sense to always take the same element of an array over a specific dimension... this could be handle with np.take automatically :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - the spec seems ambiguous about that to me, since all the examples have an equal number of axes. We could add extra axes and broadcast the order array such that it fits the data array. I agree that it's a possible enhancement, but adding this would need more tests for higher-dimensional arrays. I think we can simplify our lives here and just throw an error saying that this isn't supported until someone actually needs exactly this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regardless of what type the elements are, the array is just re-arranged according to the order on the first level.

So if for example you have rearrange(data = [[2,3], [4,3]], order=[1,0]), you'd get [[4,3], [2,3]]. Hope that helps.

Copy link
Contributor

@LukeWeidenwalker LukeWeidenwalker Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regardless of what type the elements are, the array is just re-arranged according to the order on the first level.

So if for example you have rearrange(data = [[2,3], [4,3]], order=[1,0]), you'd get [[4,3], [2,3]]. Hope that helps.

Thanks for this comment - so the order array is always expected to only have a single axis?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed. It is of type array<integer> so can't be multi-dimensional.

raise ValueError(
f"rearrange: number of axes on data ({len(data.shape)}) != number of axes ({len(order.shape)}) on order. rearrange does not support broadcasting in this case."
)

# This is to allow for the fact that apply_dimension can rearrange dimensions to put core dimensions in the back
if source_transposed_axis is not None:
order = np.moveaxis(order, source_transposed_axis, -1)

logger.warning(
"rearrange: This operation cannot be performed lazily, therefore the array will be loaded into memory here. This might fail for arrays that don't fit into memory."
)

return np.take_along_axis(data, indices=order, axis=axis)


def sort(
data: ArrayLike,
asc: Optional[bool] = True,
nodata: Optional[bool] = None,
axis: Optional[int] = None,
):
if isinstance(data, list):
data = np.asarray(data)
if len(data) == 0:
return data
if asc:
data_sorted = np.sort(data, axis=axis)
else: # [::-1] not possible
data_sorted = -np.sort(
-data, axis=axis
) # to get the indexes in descending order, the sign of the data is changed

if nodata is None: # ignore np.nan values
nan_idxs = pd.isnull(data_sorted)
return data_sorted[~nan_idxs]
elif nodata == False: # put np.nan values first
nan_idxs = pd.isnull(data_sorted)
data_sorted_flip = np.flip(data_sorted, axis=axis)
nan_idxs_flip = pd.isnull(data_sorted_flip)
data_sorted_flip[~nan_idxs_flip] = data_sorted[~nan_idxs]
return data_sorted_flip
elif nodata == True: # default sort behaviour, np.nan values are put last
return data_sorted
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def apply_dimension(
"named_parameters": named_parameters,
"axis": reordered_data.get_axis_num(dimension),
"keepdims": True,
"source_transposed_axis": data.get_axis_num(dimension),
},
exclude_dims={dimension},
)
Expand Down
70 changes: 70 additions & 0 deletions tests/test_apply.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from functools import partial

import dask.array as da
import numpy as np
import pytest
import xarray as xr
Expand Down Expand Up @@ -156,3 +157,72 @@ def test_apply_dimension_target_dimension(
verify_crs=False,
expected_results=expected_output,
)


@pytest.mark.parametrize("size", [(6, 5, 4, 4)])
@pytest.mark.parametrize("dtype", [np.float32])
def test_apply_dimension_ordering_processes(
temporal_interval, bounding_box, random_raster_data, process_registry
):
input_cube = create_fake_rastercube(
data=random_raster_data,
spatial_extent=bounding_box,
temporal_extent=temporal_interval,
bands=["B02", "B03", "B04", "B08"],
backend="dask",
)

_process_order = partial(
process_registry["order"],
data=ParameterReference(from_parameter="data"),
nodata=True,
)

output_cube_order = apply_dimension(
data=input_cube,
process=_process_order,
dimension="x",
target_dimension="target",
)

expected_output_order = np.argsort(input_cube.data, kind="mergesort", axis=0)

np.testing.assert_array_equal(output_cube_order.data, expected_output_order)
# This is to remind us that currently dask arrays don't support sorting and notify us should that change in a future version.
assert isinstance(output_cube_order.data, np.ndarray)

_process_rearrange = partial(
process_registry["rearrange"],
data=ParameterReference(from_parameter="data"),
order=da.from_array(expected_output_order),
)

output_cube_rearrange = apply_dimension(
data=input_cube, process=_process_rearrange, dimension="x", target_dimension="x"
)

expected_output_rearrange = np.take_along_axis(
input_cube.data, indices=expected_output_order, axis=0
)

np.testing.assert_array_equal(output_cube_rearrange.data, expected_output_rearrange)
# This is to remind us that currently dask arrays don't support sorting and notify us should that change in a future version.
assert isinstance(output_cube_rearrange.data, np.ndarray)

_process_sort = partial(
process_registry["sort"],
data=ParameterReference(from_parameter="data"),
nodata=True,
)

output_cube_sort = apply_dimension(
data=input_cube, process=_process_sort, dimension="x", target_dimension="target"
)

expected_output_sort = np.sort(input_cube.data, axis=0)

np.testing.assert_array_equal(output_cube_sort.data, expected_output_sort)
# This is to remind us that currently dask arrays don't support sorting and notify us should that change in a future version.
assert isinstance(output_cube_sort.data, np.ndarray)

np.testing.assert_array_equal(output_cube_sort.data, output_cube_rearrange.data)
103 changes: 103 additions & 0 deletions tests/test_arrays.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,109 @@ def test_array_labels():
array_labels(np.array([[1, 0, 3, 2], [5, 0, 6, 4]]))


@pytest.mark.parametrize(
"data, asc, nodata, expected",
[
(
[6, -1, 2, np.nan, 7, 4, np.nan, 8, 3, 9, 9],
True,
None,
[1, 2, 8, 5, 0, 4, 7, 9, 10],
),
(
[6, -1, 2, np.nan, 7, 4, np.nan, 8, 3, 9, 9],
True,
True,
[1, 2, 8, 5, 0, 4, 7, 9, 10, 3, 6],
),
(
[6, -1, 2, np.nan, 7, 4, np.nan, 8, 3, 9, 9],
False,
True,
[9, 10, 7, 4, 0, 5, 8, 2, 1, 3, 6],
),
(
[6, -1, 2, np.nan, 7, 4, np.nan, 8, 3, 9, 9],
False,
False,
[3, 6, 9, 10, 7, 4, 0, 5, 8, 2, 1],
),
],
)
def test_order(data, asc, nodata, expected):
np.testing.assert_array_equal(order(data=data, asc=asc, nodata=nodata), expected)
np.testing.assert_array_equal(
order(data=np.array(data), asc=asc, nodata=nodata), np.array(expected)
)
np.testing.assert_array_equal(
order(data=da.from_array(np.array(data)), asc=asc, nodata=nodata),
da.from_array(np.array(expected)),
)


@pytest.mark.parametrize(
"data, order, axis, expected",
[
([5, 4, 3], [2, 1, 0], None, [3, 4, 5]),
([5, 4, 3, 2], [0, 2, 1, 3], 0, [5, 3, 4, 2]),
([5, 4, 3, 2], [1, 3], 0, [4, 2]),
([[5, 4, 3, 2], [5, 4, 3, 2]], [[1, 3]], 1, [[4, 2], [4, 2]]),
],
)
def test_rearrange(data, order, axis, expected):
np.testing.assert_array_equal(
rearrange(data=data, order=order, axis=axis), expected
)
np.testing.assert_array_equal(
rearrange(data=np.array(data), order=order, axis=axis), np.array(expected)
)
np.testing.assert_array_equal(
rearrange(data=da.from_array(np.array(data)), order=order, axis=axis),
da.from_array(np.array(expected)),
)


def test_rearrange_mismatched_shape():
with pytest.raises(ValueError):
rearrange(data=[[5, 4, 3, 2], [5, 4, 3, 2]], order=[1, 3], axis=1)


@pytest.mark.parametrize(
"data, asc, nodata, expected",
[
(
[6, -1, 2, np.nan, 7, 4, np.nan, 8, 3, 9, 9],
True,
None,
[-1, 2, 3, 4, 6, 7, 8, 9, 9],
),
(
[6, -1, 2, np.nan, 7, 4, np.nan, 8, 3, 9, 9],
False,
True,
[9, 9, 8, 7, 6, 4, 3, 2, -1, np.nan, np.nan],
),
],
)
def test_sort(data, asc, nodata, expected):
"""Tests `sort` function."""
assert np.isclose(
sort(data=data, asc=asc, nodata=nodata),
expected,
equal_nan=True,
).all()
assert np.isclose(
sort(data=np.array(data), asc=asc, nodata=nodata),
expected,
equal_nan=True,
).all()
assert np.isclose(
sort(data=da.from_array(np.array(data)), asc=asc, nodata=nodata),
expected,
equal_nan=True,
).all()


@pytest.mark.parametrize("size", [(3, 3, 2, 4)])
@pytest.mark.parametrize("dtype", [np.float32])
def test_reduce_dimension(
Expand Down