Skip to content

Commit

Permalink
[AIR][Numpy] Add numpy narrow waist to Preprocessor and `BatchMappe…
Browse files Browse the repository at this point in the history
…r` (#28418)

Co-authored-by: Eric Liang <ekhliang@gmail.com>
Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com>
Co-authored-by: Amog Kamsetty <amogkamsetty@yahoo.com>
  • Loading branch information
4 people authored Sep 29, 2022
1 parent 6b7eda1 commit 9c39a28
Show file tree
Hide file tree
Showing 12 changed files with 805 additions and 161 deletions.
7 changes: 5 additions & 2 deletions python/ray/air/_internal/remote_storage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import fnmatch
import os
from packaging import version
import urllib.parse
from pkg_resources import packaging
from typing import List, Optional, Tuple

from ray.air._internal.filelock import TempFileLock
Expand Down Expand Up @@ -119,7 +119,10 @@ def get_fs_and_path(
try:
import gcsfs

if version.parse(gcsfs.__version__) > version.parse("2022.7.1"):
# For minimal install that only needs python3-setuptools
if packaging.version.parse(gcsfs.__version__) > packaging.version.parse(
"2022.7.1"
):
raise RuntimeError(
"`gcsfs` versions greater than '2022.7.1' are not "
f"compatible with pyarrow. You have gcsfs version "
Expand Down
6 changes: 4 additions & 2 deletions python/ray/air/result.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from typing import TYPE_CHECKING
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

from ray.air.checkpoint import Checkpoint
from ray.util.annotations import PublicAPI

import pandas as pd
if TYPE_CHECKING:
import pandas as pd


@dataclass
Expand Down Expand Up @@ -40,7 +42,7 @@ class Result:
checkpoint: Optional[Checkpoint]
error: Optional[Exception]
log_dir: Optional[Path]
metrics_dataframe: Optional[pd.DataFrame]
metrics_dataframe: Optional["pd.DataFrame"]
best_checkpoints: Optional[List[Tuple[Checkpoint, Dict[str, Any]]]]
_items_to_repr = ["metrics", "error", "log_dir"]

Expand Down
101 changes: 99 additions & 2 deletions python/ray/air/tests/test_data_batch_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import pyarrow as pa

from ray.air.constants import TENSOR_COLUMN_NAME
from ray.air.util.data_batch_conversion import convert_batch_type_to_pandas
from ray.air.util.data_batch_conversion import convert_pandas_to_batch_type
from ray.air.util.data_batch_conversion import (
convert_batch_type_to_pandas,
convert_pandas_to_batch_type,
_convert_batch_type_to_numpy,
)
from ray.air.util.data_batch_conversion import DataType
from ray.air.util.tensor_extensions.pandas import TensorArray
from ray.air.util.tensor_extensions.arrow import ArrowTensorArray
Expand All @@ -22,6 +25,100 @@ def test_pandas_pandas():
pd.testing.assert_frame_equal(actual_output, input_data)


def test_numpy_to_numpy():
input_data = {"x": np.arange(12).reshape(3, 4)}
expected_output = input_data
actual_output = _convert_batch_type_to_numpy(input_data)
assert expected_output == actual_output

input_data = {
"column_1": np.arange(12).reshape(3, 4),
"column_2": np.arange(12).reshape(3, 4),
}
expected_output = {
"column_1": np.arange(12).reshape(3, 4),
"column_2": np.arange(12).reshape(3, 4),
}
actual_output = _convert_batch_type_to_numpy(input_data)
assert input_data.keys() == expected_output.keys()
np.testing.assert_array_equal(input_data["column_1"], expected_output["column_1"])
np.testing.assert_array_equal(input_data["column_2"], expected_output["column_2"])

input_data = np.arange(12).reshape(3, 4)
expected_output = input_data
actual_output = _convert_batch_type_to_numpy(input_data)
np.testing.assert_array_equal(expected_output, actual_output)


def test_arrow_to_numpy():
input_data = pa.table({"column_1": [1, 2, 3, 4]})
expected_output = {"column_1": np.array([1, 2, 3, 4])}
actual_output = _convert_batch_type_to_numpy(input_data)
assert expected_output.keys() == actual_output.keys()
np.testing.assert_array_equal(
expected_output["column_1"], actual_output["column_1"]
)

input_data = pa.table(
{
TENSOR_COLUMN_NAME: ArrowTensorArray.from_numpy(
np.arange(12).reshape(3, 2, 2)
)
}
)
expected_output = np.arange(12).reshape(3, 2, 2)
actual_output = _convert_batch_type_to_numpy(input_data)
np.testing.assert_array_equal(expected_output, actual_output)

input_data = pa.table(
{
"column_1": [1, 2, 3, 4],
"column_2": [1, -1, 1, -1],
}
)
expected_output = {
"column_1": np.array([1, 2, 3, 4]),
"column_2": np.array([1, -1, 1, -1]),
}

actual_output = _convert_batch_type_to_numpy(input_data)
assert expected_output.keys() == actual_output.keys()
np.testing.assert_array_equal(
expected_output["column_1"], actual_output["column_1"]
)
np.testing.assert_array_equal(
expected_output["column_2"], actual_output["column_2"]
)


def test_pd_dataframe_to_numpy():
input_data = pd.DataFrame({"column_1": [1, 2, 3, 4]})
expected_output = np.array([1, 2, 3, 4])
actual_output = _convert_batch_type_to_numpy(input_data)
np.testing.assert_array_equal(expected_output, actual_output)

input_data = pd.DataFrame(
{TENSOR_COLUMN_NAME: TensorArray(np.arange(12).reshape(3, 4))}
)
expected_output = np.arange(12).reshape(3, 4)
actual_output = _convert_batch_type_to_numpy(input_data)
np.testing.assert_array_equal(expected_output, actual_output)

input_data = pd.DataFrame({"column_1": [1, 2, 3, 4], "column_2": [1, -1, 1, -1]})
expected_output = {
"column_1": np.array([1, 2, 3, 4]),
"column_2": np.array([1, -1, 1, -1]),
}
actual_output = _convert_batch_type_to_numpy(input_data)
assert expected_output.keys() == actual_output.keys()
np.testing.assert_array_equal(
expected_output["column_1"], actual_output["column_1"]
)
np.testing.assert_array_equal(
expected_output["column_2"], actual_output["column_2"]
)


@pytest.mark.parametrize("use_tensor_extension_for_input", [True, False])
@pytest.mark.parametrize("cast_tensor_columns", [True, False])
def test_pandas_multi_dim_pandas(cast_tensor_columns, use_tensor_extension_for_input):
Expand Down
23 changes: 19 additions & 4 deletions python/ray/air/tests/test_dataset_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,15 @@ def test_use_stream_api_config(ray_start_4_cpus):
def test_fit_transform_config(ray_start_4_cpus):
ds = ray.data.range_table(10)

def drop_odd(rows):
def drop_odd_pandas(rows):
key = list(rows)[0]
return rows[(rows[key] % 2 == 0)]

prep = BatchMapper(drop_odd)
def drop_odd_numpy(rows):
return [x for x in rows if x % 2 == 0]

prep_pandas = BatchMapper(drop_odd_pandas, batch_format="pandas")
prep_numpy = BatchMapper(drop_odd_numpy, batch_format="numpy")

# Single worker basic case.
test = TestBasic(
Expand All @@ -175,7 +179,18 @@ def drop_odd(rows):
{"train": 5, "test": 5},
dataset_config={},
datasets={"train": ds, "test": ds},
preprocessor=prep,
preprocessor=prep_pandas,
)
test.fit()

# Single worker basic case.
test = TestBasic(
1,
True,
{"train": 5, "test": 5},
dataset_config={},
datasets={"train": ds, "test": ds},
preprocessor=prep_numpy,
)
test.fit()

Expand All @@ -186,7 +201,7 @@ def drop_odd(rows):
{"train": 5, "test": 10},
dataset_config={"test": DatasetConfig(transform=False)},
datasets={"train": ds, "test": ds},
preprocessor=prep,
preprocessor=prep_pandas,
)
test.fit()

Expand Down
65 changes: 64 additions & 1 deletion python/ray/air/util/data_batch_conversion.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
from enum import Enum, auto
from typing import Union, List
from typing import Dict, Union, List

import numpy as np
import pandas as pd

from ray.air.data_batch_type import DataBatchType
from ray.air.constants import TENSOR_COLUMN_NAME
from ray.util.annotations import DeveloperAPI
from ray.air.util.tensor_extensions.arrow import ArrowTensorType

# TODO: Consolidate data conversion edges for arrow bug workaround.
from ray.air.util.transform_pyarrow import (
_is_column_extension_type,
_concatenate_extension_column,
)

try:
import pyarrow
Expand Down Expand Up @@ -109,6 +116,62 @@ def convert_pandas_to_batch_type(
)


def _convert_batch_type_to_numpy(
data: DataBatchType,
) -> Union[np.ndarray, Dict[str, np.ndarray]]:
"""Convert the provided data to a NumPy ndarray or dict of ndarrays.
Args:
data: Data of type DataBatchType
Returns:
A numpy representation of the input data.
"""
if isinstance(data, np.ndarray):
return data
elif isinstance(data, dict):
for col_name, col in data.items():
if not isinstance(col, np.ndarray):
raise ValueError(
"All values in the provided dict must be of type "
f"np.ndarray. Found type {type(col)} for key {col_name} "
f"instead."
)
return data
elif pyarrow is not None and isinstance(data, pyarrow.Table):
if data.column_names == [TENSOR_COLUMN_NAME] and (
isinstance(data.schema.types[0], ArrowTensorType)
):
# If representing a tensor dataset, return as a single numpy array.
# Example: ray.data.from_numpy(np.arange(12).reshape((3, 2, 2)))
# Arrow’s incorrect concatenation of extension arrays:
# https://issues.apache.org/jira/browse/ARROW-16503
return _concatenate_extension_column(data[TENSOR_COLUMN_NAME]).to_numpy(
zero_copy_only=False
)
else:
output_dict = {}
for col_name in data.column_names:
col = data[col_name]
if col.num_chunks == 0:
col = pyarrow.array([], type=col.type)
elif _is_column_extension_type(col):
# Arrow’s incorrect concatenation of extension arrays:
# https://issues.apache.org/jira/browse/ARROW-16503
col = _concatenate_extension_column(col)
else:
col = col.combine_chunks()
output_dict[col_name] = col.to_numpy(zero_copy_only=False)
return output_dict
elif isinstance(data, pd.DataFrame):
return convert_pandas_to_batch_type(data, DataType.NUMPY)
else:
raise ValueError(
f"Received data of type: {type(data)}, but expected it to be one "
f"of {DataBatchType}"
)


def _ndarray_to_column(arr: np.ndarray) -> Union[pd.Series, List[np.ndarray]]:
"""Convert a NumPy ndarray into an appropriate column format for insertion into a
pandas DataFrame.
Expand Down
31 changes: 31 additions & 0 deletions python/ray/air/util/transform_pyarrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
try:
import pyarrow
except ImportError:
pyarrow = None


def _is_column_extension_type(ca: "pyarrow.ChunkedArray") -> bool:
"""Whether the provided Arrow Table column is an extension array, using an Arrow
extension type.
"""
return isinstance(ca.type, pyarrow.ExtensionType)


def _concatenate_extension_column(ca: "pyarrow.ChunkedArray") -> "pyarrow.Array":
"""Concatenate chunks of an extension column into a contiguous array.
This concatenation is required for creating copies and for .take() to work on
extension arrays.
See https://issues.apache.org/jira/browse/ARROW-16503.
"""
if not _is_column_extension_type(ca):
raise ValueError("Chunked array isn't an extension array: {ca}")

if ca.num_chunks == 0:
# No-op for no-chunk chunked arrays, since there's nothing to concatenate.
return ca

chunk = ca.chunk(0)
return type(chunk).from_storage(
chunk.type, pyarrow.concat_arrays([c.storage for c in ca.chunks])
)
4 changes: 2 additions & 2 deletions python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

import numpy as np

from ray.data._internal.arrow_ops import transform_polars, transform_pyarrow
from ray.data._internal.arrow_ops.transform_pyarrow import (
from ray.air.util.transform_pyarrow import (
_concatenate_extension_column,
_is_column_extension_type,
)
from ray.data._internal.arrow_ops import transform_polars, transform_pyarrow
from ray.data._internal.table_block import (
VALUE_COL_NAME,
TableBlockAccessor,
Expand Down
32 changes: 5 additions & 27 deletions python/ray/data/_internal/arrow_ops/transform_pyarrow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from typing import TYPE_CHECKING, List, Union

from ray.air.util.transform_pyarrow import (
_is_column_extension_type,
_concatenate_extension_column,
)

try:
import pyarrow
except ImportError:
Expand All @@ -16,33 +21,6 @@ def sort(table: "pyarrow.Table", key: "SortKeyT", descending: bool) -> "pyarrow.
return table.take(indices)


def _is_column_extension_type(ca: "pyarrow.ChunkedArray") -> bool:
"""Whether the provided Arrow Table column is an extension array, using an Arrow
extension type.
"""
return isinstance(ca.type, pyarrow.ExtensionType)


def _concatenate_extension_column(ca: "pyarrow.ChunkedArray") -> "pyarrow.Array":
"""Concatenate chunks of an extension column into a contiguous array.
This concatenation is required for creating copies and for .take() to work on
extension arrays.
See https://issues.apache.org/jira/browse/ARROW-16503.
"""
if not _is_column_extension_type(ca):
raise ValueError("Chunked array isn't an extension array: {ca}")

if ca.num_chunks == 0:
# No-op for no-chunk chunked arrays, since there's nothing to concatenate.
return ca

chunk = ca.chunk(0)
return type(chunk).from_storage(
chunk.type, pyarrow.concat_arrays([c.storage for c in ca.chunks])
)


def take_table(
table: "pyarrow.Table",
indices: Union[List[int], "pyarrow.Array", "pyarrow.ChunkedArray"],
Expand Down
Loading

0 comments on commit 9c39a28

Please sign in to comment.