From 9c39a28ba2f6221ffd8327fa21cb8294f0390fee Mon Sep 17 00:00:00 2001 From: Jiao Date: Thu, 29 Sep 2022 10:54:55 -0700 Subject: [PATCH] [AIR][Numpy] Add numpy narrow waist to `Preprocessor` and `BatchMapper` (#28418) Co-authored-by: Eric Liang Co-authored-by: Clark Zinzow Co-authored-by: Amog Kamsetty --- python/ray/air/_internal/remote_storage.py | 7 +- python/ray/air/result.py | 6 +- .../air/tests/test_data_batch_conversion.py | 101 ++++- python/ray/air/tests/test_dataset_config.py | 23 +- python/ray/air/util/data_batch_conversion.py | 65 +++- python/ray/air/util/transform_pyarrow.py | 31 ++ python/ray/data/_internal/arrow_block.py | 4 +- .../_internal/arrow_ops/transform_pyarrow.py | 32 +- python/ray/data/preprocessor.py | 121 +++--- python/ray/data/preprocessors/batch_mapper.py | 55 ++- python/ray/data/tests/test_batch_mapper.py | 353 ++++++++++++++++++ python/ray/data/tests/test_preprocessors.py | 168 +++++---- 12 files changed, 805 insertions(+), 161 deletions(-) create mode 100644 python/ray/air/util/transform_pyarrow.py create mode 100644 python/ray/data/tests/test_batch_mapper.py diff --git a/python/ray/air/_internal/remote_storage.py b/python/ray/air/_internal/remote_storage.py index 3789d44788f2c..de67e5ae064db 100644 --- a/python/ray/air/_internal/remote_storage.py +++ b/python/ray/air/_internal/remote_storage.py @@ -1,7 +1,7 @@ import fnmatch import os -from packaging import version import urllib.parse +from pkg_resources import packaging from typing import List, Optional, Tuple from ray.air._internal.filelock import TempFileLock @@ -119,7 +119,10 @@ def get_fs_and_path( try: import gcsfs - if version.parse(gcsfs.__version__) > version.parse("2022.7.1"): + # For minimal install that only needs python3-setuptools + if packaging.version.parse(gcsfs.__version__) > packaging.version.parse( + "2022.7.1" + ): raise RuntimeError( "`gcsfs` versions greater than '2022.7.1' are not " f"compatible with pyarrow. You have gcsfs version " diff --git a/python/ray/air/result.py b/python/ray/air/result.py index 357bc14d39b2b..002a80b770f08 100644 --- a/python/ray/air/result.py +++ b/python/ray/air/result.py @@ -1,3 +1,4 @@ +from typing import TYPE_CHECKING from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple @@ -5,7 +6,8 @@ from ray.air.checkpoint import Checkpoint from ray.util.annotations import PublicAPI -import pandas as pd +if TYPE_CHECKING: + import pandas as pd @dataclass @@ -40,7 +42,7 @@ class Result: checkpoint: Optional[Checkpoint] error: Optional[Exception] log_dir: Optional[Path] - metrics_dataframe: Optional[pd.DataFrame] + metrics_dataframe: Optional["pd.DataFrame"] best_checkpoints: Optional[List[Tuple[Checkpoint, Dict[str, Any]]]] _items_to_repr = ["metrics", "error", "log_dir"] diff --git a/python/ray/air/tests/test_data_batch_conversion.py b/python/ray/air/tests/test_data_batch_conversion.py index d88533809b1d2..d7e2307a23db8 100644 --- a/python/ray/air/tests/test_data_batch_conversion.py +++ b/python/ray/air/tests/test_data_batch_conversion.py @@ -5,8 +5,11 @@ import pyarrow as pa from ray.air.constants import TENSOR_COLUMN_NAME -from ray.air.util.data_batch_conversion import convert_batch_type_to_pandas -from ray.air.util.data_batch_conversion import convert_pandas_to_batch_type +from ray.air.util.data_batch_conversion import ( + convert_batch_type_to_pandas, + convert_pandas_to_batch_type, + _convert_batch_type_to_numpy, +) from ray.air.util.data_batch_conversion import DataType from ray.air.util.tensor_extensions.pandas import TensorArray from ray.air.util.tensor_extensions.arrow import ArrowTensorArray @@ -22,6 +25,100 @@ def test_pandas_pandas(): pd.testing.assert_frame_equal(actual_output, input_data) +def test_numpy_to_numpy(): + input_data = {"x": np.arange(12).reshape(3, 4)} + expected_output = input_data + actual_output = _convert_batch_type_to_numpy(input_data) + assert expected_output == actual_output + + input_data = { + "column_1": np.arange(12).reshape(3, 4), + "column_2": np.arange(12).reshape(3, 4), + } + expected_output = { + "column_1": np.arange(12).reshape(3, 4), + "column_2": np.arange(12).reshape(3, 4), + } + actual_output = _convert_batch_type_to_numpy(input_data) + assert input_data.keys() == expected_output.keys() + np.testing.assert_array_equal(input_data["column_1"], expected_output["column_1"]) + np.testing.assert_array_equal(input_data["column_2"], expected_output["column_2"]) + + input_data = np.arange(12).reshape(3, 4) + expected_output = input_data + actual_output = _convert_batch_type_to_numpy(input_data) + np.testing.assert_array_equal(expected_output, actual_output) + + +def test_arrow_to_numpy(): + input_data = pa.table({"column_1": [1, 2, 3, 4]}) + expected_output = {"column_1": np.array([1, 2, 3, 4])} + actual_output = _convert_batch_type_to_numpy(input_data) + assert expected_output.keys() == actual_output.keys() + np.testing.assert_array_equal( + expected_output["column_1"], actual_output["column_1"] + ) + + input_data = pa.table( + { + TENSOR_COLUMN_NAME: ArrowTensorArray.from_numpy( + np.arange(12).reshape(3, 2, 2) + ) + } + ) + expected_output = np.arange(12).reshape(3, 2, 2) + actual_output = _convert_batch_type_to_numpy(input_data) + np.testing.assert_array_equal(expected_output, actual_output) + + input_data = pa.table( + { + "column_1": [1, 2, 3, 4], + "column_2": [1, -1, 1, -1], + } + ) + expected_output = { + "column_1": np.array([1, 2, 3, 4]), + "column_2": np.array([1, -1, 1, -1]), + } + + actual_output = _convert_batch_type_to_numpy(input_data) + assert expected_output.keys() == actual_output.keys() + np.testing.assert_array_equal( + expected_output["column_1"], actual_output["column_1"] + ) + np.testing.assert_array_equal( + expected_output["column_2"], actual_output["column_2"] + ) + + +def test_pd_dataframe_to_numpy(): + input_data = pd.DataFrame({"column_1": [1, 2, 3, 4]}) + expected_output = np.array([1, 2, 3, 4]) + actual_output = _convert_batch_type_to_numpy(input_data) + np.testing.assert_array_equal(expected_output, actual_output) + + input_data = pd.DataFrame( + {TENSOR_COLUMN_NAME: TensorArray(np.arange(12).reshape(3, 4))} + ) + expected_output = np.arange(12).reshape(3, 4) + actual_output = _convert_batch_type_to_numpy(input_data) + np.testing.assert_array_equal(expected_output, actual_output) + + input_data = pd.DataFrame({"column_1": [1, 2, 3, 4], "column_2": [1, -1, 1, -1]}) + expected_output = { + "column_1": np.array([1, 2, 3, 4]), + "column_2": np.array([1, -1, 1, -1]), + } + actual_output = _convert_batch_type_to_numpy(input_data) + assert expected_output.keys() == actual_output.keys() + np.testing.assert_array_equal( + expected_output["column_1"], actual_output["column_1"] + ) + np.testing.assert_array_equal( + expected_output["column_2"], actual_output["column_2"] + ) + + @pytest.mark.parametrize("use_tensor_extension_for_input", [True, False]) @pytest.mark.parametrize("cast_tensor_columns", [True, False]) def test_pandas_multi_dim_pandas(cast_tensor_columns, use_tensor_extension_for_input): diff --git a/python/ray/air/tests/test_dataset_config.py b/python/ray/air/tests/test_dataset_config.py index 13b165c8929ae..4f2927559082e 100644 --- a/python/ray/air/tests/test_dataset_config.py +++ b/python/ray/air/tests/test_dataset_config.py @@ -162,11 +162,15 @@ def test_use_stream_api_config(ray_start_4_cpus): def test_fit_transform_config(ray_start_4_cpus): ds = ray.data.range_table(10) - def drop_odd(rows): + def drop_odd_pandas(rows): key = list(rows)[0] return rows[(rows[key] % 2 == 0)] - prep = BatchMapper(drop_odd) + def drop_odd_numpy(rows): + return [x for x in rows if x % 2 == 0] + + prep_pandas = BatchMapper(drop_odd_pandas, batch_format="pandas") + prep_numpy = BatchMapper(drop_odd_numpy, batch_format="numpy") # Single worker basic case. test = TestBasic( @@ -175,7 +179,18 @@ def drop_odd(rows): {"train": 5, "test": 5}, dataset_config={}, datasets={"train": ds, "test": ds}, - preprocessor=prep, + preprocessor=prep_pandas, + ) + test.fit() + + # Single worker basic case. + test = TestBasic( + 1, + True, + {"train": 5, "test": 5}, + dataset_config={}, + datasets={"train": ds, "test": ds}, + preprocessor=prep_numpy, ) test.fit() @@ -186,7 +201,7 @@ def drop_odd(rows): {"train": 5, "test": 10}, dataset_config={"test": DatasetConfig(transform=False)}, datasets={"train": ds, "test": ds}, - preprocessor=prep, + preprocessor=prep_pandas, ) test.fit() diff --git a/python/ray/air/util/data_batch_conversion.py b/python/ray/air/util/data_batch_conversion.py index 56c833172f55c..c7080551af640 100644 --- a/python/ray/air/util/data_batch_conversion.py +++ b/python/ray/air/util/data_batch_conversion.py @@ -1,5 +1,5 @@ from enum import Enum, auto -from typing import Union, List +from typing import Dict, Union, List import numpy as np import pandas as pd @@ -7,6 +7,13 @@ from ray.air.data_batch_type import DataBatchType from ray.air.constants import TENSOR_COLUMN_NAME from ray.util.annotations import DeveloperAPI +from ray.air.util.tensor_extensions.arrow import ArrowTensorType + +# TODO: Consolidate data conversion edges for arrow bug workaround. +from ray.air.util.transform_pyarrow import ( + _is_column_extension_type, + _concatenate_extension_column, +) try: import pyarrow @@ -109,6 +116,62 @@ def convert_pandas_to_batch_type( ) +def _convert_batch_type_to_numpy( + data: DataBatchType, +) -> Union[np.ndarray, Dict[str, np.ndarray]]: + """Convert the provided data to a NumPy ndarray or dict of ndarrays. + + Args: + data: Data of type DataBatchType + + Returns: + A numpy representation of the input data. + """ + if isinstance(data, np.ndarray): + return data + elif isinstance(data, dict): + for col_name, col in data.items(): + if not isinstance(col, np.ndarray): + raise ValueError( + "All values in the provided dict must be of type " + f"np.ndarray. Found type {type(col)} for key {col_name} " + f"instead." + ) + return data + elif pyarrow is not None and isinstance(data, pyarrow.Table): + if data.column_names == [TENSOR_COLUMN_NAME] and ( + isinstance(data.schema.types[0], ArrowTensorType) + ): + # If representing a tensor dataset, return as a single numpy array. + # Example: ray.data.from_numpy(np.arange(12).reshape((3, 2, 2))) + # Arrow’s incorrect concatenation of extension arrays: + # https://issues.apache.org/jira/browse/ARROW-16503 + return _concatenate_extension_column(data[TENSOR_COLUMN_NAME]).to_numpy( + zero_copy_only=False + ) + else: + output_dict = {} + for col_name in data.column_names: + col = data[col_name] + if col.num_chunks == 0: + col = pyarrow.array([], type=col.type) + elif _is_column_extension_type(col): + # Arrow’s incorrect concatenation of extension arrays: + # https://issues.apache.org/jira/browse/ARROW-16503 + col = _concatenate_extension_column(col) + else: + col = col.combine_chunks() + output_dict[col_name] = col.to_numpy(zero_copy_only=False) + return output_dict + elif isinstance(data, pd.DataFrame): + return convert_pandas_to_batch_type(data, DataType.NUMPY) + else: + raise ValueError( + f"Received data of type: {type(data)}, but expected it to be one " + f"of {DataBatchType}" + ) + + def _ndarray_to_column(arr: np.ndarray) -> Union[pd.Series, List[np.ndarray]]: """Convert a NumPy ndarray into an appropriate column format for insertion into a pandas DataFrame. diff --git a/python/ray/air/util/transform_pyarrow.py b/python/ray/air/util/transform_pyarrow.py new file mode 100644 index 0000000000000..a93e2963d6288 --- /dev/null +++ b/python/ray/air/util/transform_pyarrow.py @@ -0,0 +1,31 @@ +try: + import pyarrow +except ImportError: + pyarrow = None + + +def _is_column_extension_type(ca: "pyarrow.ChunkedArray") -> bool: + """Whether the provided Arrow Table column is an extension array, using an Arrow + extension type. + """ + return isinstance(ca.type, pyarrow.ExtensionType) + + +def _concatenate_extension_column(ca: "pyarrow.ChunkedArray") -> "pyarrow.Array": + """Concatenate chunks of an extension column into a contiguous array. + + This concatenation is required for creating copies and for .take() to work on + extension arrays. + See https://issues.apache.org/jira/browse/ARROW-16503. + """ + if not _is_column_extension_type(ca): + raise ValueError("Chunked array isn't an extension array: {ca}") + + if ca.num_chunks == 0: + # No-op for no-chunk chunked arrays, since there's nothing to concatenate. + return ca + + chunk = ca.chunk(0) + return type(chunk).from_storage( + chunk.type, pyarrow.concat_arrays([c.storage for c in ca.chunks]) + ) diff --git a/python/ray/data/_internal/arrow_block.py b/python/ray/data/_internal/arrow_block.py index 8a1b15fc8d046..159b9aee08204 100644 --- a/python/ray/data/_internal/arrow_block.py +++ b/python/ray/data/_internal/arrow_block.py @@ -16,11 +16,11 @@ import numpy as np -from ray.data._internal.arrow_ops import transform_polars, transform_pyarrow -from ray.data._internal.arrow_ops.transform_pyarrow import ( +from ray.air.util.transform_pyarrow import ( _concatenate_extension_column, _is_column_extension_type, ) +from ray.data._internal.arrow_ops import transform_polars, transform_pyarrow from ray.data._internal.table_block import ( VALUE_COL_NAME, TableBlockAccessor, diff --git a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py index 83f12a55556ba..08656e4ac2324 100644 --- a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py +++ b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py @@ -1,5 +1,10 @@ from typing import TYPE_CHECKING, List, Union +from ray.air.util.transform_pyarrow import ( + _is_column_extension_type, + _concatenate_extension_column, +) + try: import pyarrow except ImportError: @@ -16,33 +21,6 @@ def sort(table: "pyarrow.Table", key: "SortKeyT", descending: bool) -> "pyarrow. return table.take(indices) -def _is_column_extension_type(ca: "pyarrow.ChunkedArray") -> bool: - """Whether the provided Arrow Table column is an extension array, using an Arrow - extension type. - """ - return isinstance(ca.type, pyarrow.ExtensionType) - - -def _concatenate_extension_column(ca: "pyarrow.ChunkedArray") -> "pyarrow.Array": - """Concatenate chunks of an extension column into a contiguous array. - - This concatenation is required for creating copies and for .take() to work on - extension arrays. - See https://issues.apache.org/jira/browse/ARROW-16503. - """ - if not _is_column_extension_type(ca): - raise ValueError("Chunked array isn't an extension array: {ca}") - - if ca.num_chunks == 0: - # No-op for no-chunk chunked arrays, since there's nothing to concatenate. - return ca - - chunk = ca.chunk(0) - return type(chunk).from_storage( - chunk.type, pyarrow.concat_arrays([c.storage for c in ca.chunks]) - ) - - def take_table( table: "pyarrow.Table", indices: Union[List[int], "pyarrow.Array", "pyarrow.ChunkedArray"], diff --git a/python/ray/data/preprocessor.py b/python/ray/data/preprocessor.py index 55f8c21457969..cc5f59bc2c20d 100644 --- a/python/ray/data/preprocessor.py +++ b/python/ray/data/preprocessor.py @@ -1,14 +1,15 @@ import abc import warnings from enum import Enum -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Union, Dict + +import numpy as np from ray.data import Dataset from ray.util.annotations import DeveloperAPI, PublicAPI if TYPE_CHECKING: import pandas as pd - import pyarrow from ray.air.data_batch_type import DataBatchType @@ -38,7 +39,7 @@ class Preprocessor(abc.ABC): * ``_fit`` if your preprocessor is stateful. Otherwise, set ``_is_fittable=False``. - * ``_transform_pandas`` and/or ``_transform_arrow`` for best performance, + * ``_transform_pandas`` and/or ``_transform_numpy`` for best performance, implement both. Otherwise, the data will be converted to the match the implemented method. """ @@ -145,14 +146,14 @@ def transform(self, dataset: Dataset) -> Dataset: self._transform_stats = transformed_ds.stats() return transformed_ds - def transform_batch(self, df: "DataBatchType") -> "DataBatchType": + def transform_batch(self, data: "DataBatchType") -> "DataBatchType": """Transform a single batch of data. The data will be converted to the format supported by the Preprocessor, based on which ``_transform_*`` method(s) are implemented. Args: - df: Input data batch. + data: Input data batch. Returns: DataBatchType: @@ -168,7 +169,7 @@ def transform_batch(self, df: "DataBatchType") -> "DataBatchType": raise PreprocessorNotFittedException( "`fit` must be called before `transform_batch`." ) - return self._transform_batch(df) + return self._transform_batch(data) def _check_is_fitted(self) -> bool: """Returns whether this preprocessor is fitted. @@ -187,36 +188,50 @@ def _fit(self, dataset: Dataset) -> "Preprocessor": def _determine_transform_to_use(self, data_format: str) -> str: """Determine which transform to use based on data format and implementation. - * If only _transform_arrow is implemented, will convert the data to arrow. - * If only _transform_pandas is implemented, will convert the data to pandas. - If both are implemented, will pick the method corresponding to the format - for best performance. - * Implementation is defined as overriding the method in a sub-class. + We will infer and pick the best transform to use: + * ``pandas`` data format prioritizes ``pandas`` transform if available. + * ``arrow`` and ``numpy`` data format prioritizes ``numpy`` transform if available. # noqa: E501 + * Fall back to what's available if no preferred path found. """ - assert data_format in ("pandas", "arrow") - has_transform_arrow = ( - self.__class__._transform_arrow != Preprocessor._transform_arrow - ) + assert data_format in ( + "pandas", + "arrow", + "numpy", + ), f"Unsupported data format: {data_format}" + has_transform_pandas = ( self.__class__._transform_pandas != Preprocessor._transform_pandas ) + has_transform_numpy = ( + self.__class__._transform_numpy != Preprocessor._transform_numpy + ) - if has_transform_arrow and has_transform_pandas: - # Both transforms available, so we delegate based on the dataset format to - # ensure minimal data conversions. - if data_format == "pandas": + # Infer transform type by prioritizing native transformation to minimize + # data conversion cost. + if data_format == "pandas": + # Perform native pandas transformation if possible. + if has_transform_pandas: transform_type = "pandas" - elif data_format == "arrow": - transform_type = "arrow" - elif has_transform_pandas: - transform_type = "pandas" - elif has_transform_arrow: - transform_type = "arrow" - else: - raise NotImplementedError( - "Neither `_transform_arrow` nor `_transform_pandas` are implemented." - ) + elif has_transform_numpy: + transform_type = "numpy" + else: + raise NotImplementedError( + "None of `_transform_numpy` or `_transform_pandas` " + f"are implemented for dataset format `{data_format}`." + ) + elif data_format == "arrow" or data_format == "numpy": + # Arrow -> Numpy is more efficient + if has_transform_numpy: + transform_type = "numpy" + elif has_transform_pandas: + transform_type = "pandas" + else: + raise NotImplementedError( + "None of `_transform_numpy` or `_transform_pandas` " + f"are implemented for dataset format `{data_format}`." + ) + return transform_type def _transform(self, dataset: Dataset) -> Dataset: @@ -226,52 +241,56 @@ def _transform(self, dataset: Dataset) -> Dataset: dataset_format = dataset._dataset_format() if dataset_format not in ("pandas", "arrow"): raise ValueError( - f"Unsupported Dataset format: '{dataset_format}'. Only 'pandas' and " - "'arrow' Dataset formats are supported." + f"Unsupported Dataset format: '{dataset_format}'. Only 'pandas' " + "and 'arrow' Dataset formats are supported." ) transform_type = self._determine_transform_to_use(dataset_format) + # Our user facing batch format should only be pandas or numpy, other + # formats {arrow, simple} are internal. if transform_type == "pandas": return dataset.map_batches(self._transform_pandas, batch_format="pandas") - elif transform_type == "arrow": - return dataset.map_batches(self._transform_arrow, batch_format="pyarrow") + elif transform_type == "numpy": + return dataset.map_batches(self._transform_numpy, batch_format="numpy") else: raise ValueError( "Invalid transform type returned from _determine_transform_to_use; " - f'"pandas" and "arrow" allowed, but got: {transform_type}' + f'"pandas" and "numpy" allowed, but got: {transform_type}' ) - def _transform_batch(self, df: "DataBatchType") -> "DataBatchType": + def _transform_batch(self, data: "DataBatchType") -> "DataBatchType": + # For minimal install to locally import air modules import pandas as pd + from ray.air.util.data_batch_conversion import ( + convert_batch_type_to_pandas, + _convert_batch_type_to_numpy, + ) try: import pyarrow except ImportError: pyarrow = None - if isinstance(df, pd.DataFrame): + if isinstance(data, pd.DataFrame): data_format = "pandas" - elif pyarrow is not None and isinstance(df, pyarrow.Table): + elif pyarrow is not None and isinstance(data, pyarrow.Table): data_format = "arrow" + elif isinstance(data, (dict, np.ndarray)): + data_format = "numpy" else: raise NotImplementedError( - "`transform_batch` is currently only implemented for Pandas DataFrames " - f"and PyArrow Tables. Got {type(df)}." + "`transform_batch` is currently only implemented for Pandas " + "DataFrames, pyarrow Tables, NumPy ndarray and dictionary of " + f"ndarray. Got {type(data)}." ) transform_type = self._determine_transform_to_use(data_format) if transform_type == "pandas": - if data_format == "pandas": - return self._transform_pandas(df) - else: - return self._transform_pandas(df.to_pandas()) - elif transform_type == "arrow": - if data_format == "arrow": - return self._transform_arrow(df) - else: - return self._transform_arrow(pyarrow.Table.from_pandas(df)) + return self._transform_pandas(convert_batch_type_to_pandas(data)) + elif transform_type == "numpy": + return self._transform_numpy(_convert_batch_type_to_numpy(data)) @DeveloperAPI def _transform_pandas(self, df: "pd.DataFrame") -> "pd.DataFrame": @@ -279,6 +298,8 @@ def _transform_pandas(self, df: "pd.DataFrame") -> "pd.DataFrame": raise NotImplementedError() @DeveloperAPI - def _transform_arrow(self, table: "pyarrow.Table") -> "pyarrow.Table": - """Run the transformation on a data batch in a PyArrow Table format.""" + def _transform_numpy( + self, np_data: Union[np.ndarray, Dict[str, np.ndarray]] + ) -> Union[np.ndarray, Dict[str, np.ndarray]]: + """Run the transformation on a data batch in a NumPy ndarray format.""" raise NotImplementedError() diff --git a/python/ray/data/preprocessors/batch_mapper.py b/python/ray/data/preprocessors/batch_mapper.py index 90ad42771849e..55b1f176d3909 100644 --- a/python/ray/data/preprocessors/batch_mapper.py +++ b/python/ray/data/preprocessors/batch_mapper.py @@ -1,4 +1,7 @@ -from typing import Callable, TYPE_CHECKING +from typing import Dict, Callable, Optional, Union, TYPE_CHECKING +import warnings + +import numpy as np from ray.data.preprocessor import Preprocessor @@ -24,6 +27,8 @@ class BatchMapper(Preprocessor): Use :class:`BatchMapper` to apply arbitrary operations like dropping a column. >>> import pandas as pd + >>> import numpy as np + >>> from typing import Dict >>> import ray >>> from ray.data.preprocessors import BatchMapper >>> @@ -36,19 +41,65 @@ class BatchMapper(Preprocessor): >>> preprocessor = BatchMapper(fn) >>> preprocessor.transform(ds) # doctest: +SKIP Dataset(num_blocks=1, num_rows=3, schema={X: int64}) + >>> + >>> def fn_numpy(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: + ... return {"X": batch["X"]} + >>> preprocessor = BatchMapper(fn_numpy, batch_format="numpy") + >>> preprocessor.transform(ds) # doctest: +SKIP + Dataset(num_blocks=1, num_rows=3, schema={X: int64}) Args: fn: The function to apply to data batches. + batch_format: The preferred batch format to use in UDF. If not given, + we will infer based on the input dataset data format. """ _is_fittable = False - def __init__(self, fn: Callable[["pandas.DataFrame"], "pandas.DataFrame"]): + def __init__( + self, + fn: Union[ + Callable[["pandas.DataFrame"], "pandas.DataFrame"], + Callable[ + [Union[np.ndarray, Dict[str, np.ndarray]]], + Union[np.ndarray, Dict[str, np.ndarray]], + ], + ], + batch_format: Optional[str] = None, + # TODO: Make batch_format required from user + # TODO: Introduce a "zero_copy" format + # TODO: We should reach consistency of args between BatchMapper and map_batches. + ): + if not batch_format: + warnings.warn( + "batch_format will be a required argument for BatchMapper in future " + "releases. Defaulting to 'pandas' batch format.", + DeprecationWarning, + ) + batch_format = "pandas" + if batch_format and batch_format not in [ + "pandas", + "numpy", + ]: + raise ValueError("BatchMapper only supports pandas and numpy batch format.") + + self.batch_format = batch_format self.fn = fn + def _transform_numpy( + self, np_data: Union[np.ndarray, Dict[str, np.ndarray]] + ) -> Union[np.ndarray, Dict[str, np.ndarray]]: + return self.fn(np_data) + def _transform_pandas(self, df: "pandas.DataFrame") -> "pandas.DataFrame": return self.fn(df) + def _determine_transform_to_use(self, data_format: str): + if self.batch_format: + return self.batch_format + else: + return super()._determine_transform_to_use(data_format) + def __repr__(self): fn_name = getattr(self.fn, "__name__", self.fn) return f"{self.__class__.__name__}(fn={fn_name})" diff --git a/python/ray/data/tests/test_batch_mapper.py b/python/ray/data/tests/test_batch_mapper.py new file mode 100644 index 0000000000000..ce099caf7b75f --- /dev/null +++ b/python/ray/data/tests/test_batch_mapper.py @@ -0,0 +1,353 @@ +import numpy as np +import pandas as pd +import pyarrow as pa + +import pytest +from typing import Dict, Union +from pandas.testing import assert_frame_equal + +import ray +from ray.data.preprocessors import BatchMapper +from ray.data.extensions import TensorArray +from ray.air.constants import TENSOR_COLUMN_NAME +from ray.air.util.tensor_extensions.arrow import ArrowTensorArray + + +# ===== Pandas dataset formats ===== +def ds_pandas_single_column_format(): + in_df = pd.DataFrame({"column_1": [1, 2, 3, 4]}) + ds = ray.data.from_pandas(in_df) + return ds + + +def ds_pandas_multi_column_format(): + in_df = pd.DataFrame({"column_1": [1, 2, 3, 4], "column_2": [1, -1, 1, -1]}) + ds = ray.data.from_pandas(in_df) + return ds + + +# ===== Arrow dataset formats ===== +def ds_arrow_single_column_format(): + ds = ray.data.from_arrow(pa.table({"column_1": [1, 2, 3, 4]})) + return ds + + +def ds_arrow_single_column_tensor_format(): + ds = ray.data.from_arrow( + pa.table( + { + TENSOR_COLUMN_NAME: ArrowTensorArray.from_numpy( + np.arange(12).reshape((3, 2, 2)) + ) + } + ) + ) + return ds + + +def ds_arrow_multi_column_format(): + ds = ray.data.from_arrow( + pa.table( + { + "column_1": [1, 2, 3, 4], + "column_2": [1, -1, 1, -1], + } + ) + ) + return ds + + +# ===== Numpy dataset formats ===== +def ds_numpy_single_column_tensor_format(): + ds = ray.data.from_numpy(np.arange(12).reshape((3, 2, 2))) + return ds + + +def ds_numpy_list_of_ndarray_tensor_format(): + ds = ray.data.from_numpy( + [np.arange(12).reshape((3, 2, 2)), np.arange(12, 24).reshape((3, 2, 2))] + ) + return ds + + +@pytest.mark.parametrize( + "ds_with_expected_pandas_numpy_df", + [ + ( + ds_pandas_single_column_format(), + pd.DataFrame( + { + "column_1": [2, 3, 4, 5], + } + ), + pd.DataFrame( + { + # Single column pandas automatically converts `TENSOR_COLUMN_NAME` + # In UDFs + TENSOR_COLUMN_NAME: [2, 3, 4, 5], + } + ), + ), + ( + ds_pandas_multi_column_format(), + pd.DataFrame( + { + "column_1": [2, 3, 4, 5], + "column_2": [2, -2, 2, -2], + } + ), + pd.DataFrame( + { + "column_1": [2, 3, 4, 5], + "column_2": [2, -2, 2, -2], + } + ), + ), + ], +) +def test_batch_mapper_pandas_data_format(ds_with_expected_pandas_numpy_df): + """Tests batch mapper functionality for pandas data format. + + Note: + For single column pandas dataframes, we automatically convert it to + single column tensor with column name as `__value__`. + """ + ds, expected_df, expected_numpy_df = ds_with_expected_pandas_numpy_df + + def add_and_modify_udf_pandas(df: "pd.DataFrame"): + df["column_1"] = df["column_1"] + 1 + if "column_2" in df: + df["column_2"] *= 2 + return df + + def add_and_modify_udf_numpy(data: Union[np.ndarray, Dict[str, np.ndarray]]): + if isinstance(data, np.ndarray): + data += 1 + else: + data["column_1"] = data["column_1"] + 1 + if "column_2" in data: + data["column_2"] *= 2 + return data + + # Test map_batches + transformed_ds = ds.map_batches(add_and_modify_udf_pandas, batch_format="pandas") + out_df_map_batches = transformed_ds.to_pandas() + assert_frame_equal(out_df_map_batches, expected_df) + + transformed_ds = ds.map_batches(add_and_modify_udf_numpy, batch_format="numpy") + out_df_map_batches = transformed_ds.to_pandas() + assert_frame_equal(out_df_map_batches, expected_numpy_df) + + # Test BatchMapper + batch_mapper = BatchMapper(fn=add_and_modify_udf_pandas, batch_format="pandas") + batch_mapper.fit(ds) + transformed_ds = batch_mapper.transform(ds) + out_df = transformed_ds.to_pandas() + assert_frame_equal(out_df, expected_df) + + batch_mapper = BatchMapper(fn=add_and_modify_udf_numpy, batch_format="numpy") + batch_mapper.fit(ds) + transformed_ds = batch_mapper.transform(ds) + out_df = transformed_ds.to_pandas() + assert_frame_equal(out_df, expected_numpy_df) + + +@pytest.mark.parametrize( + "ds_with_expected_pandas_numpy_df", + [ + ( + ds_arrow_single_column_format(), + pd.DataFrame( + { + "column_1": [2, 3, 4, 5], + } + ), + pd.DataFrame( + { + # Single column pandas automatically converts `TENSOR_COLUMN_NAME` + # In UDFs + TENSOR_COLUMN_NAME: [2, 3, 4, 5], + } + ), + ), + ( + ds_arrow_single_column_tensor_format(), + pd.DataFrame( + { + TENSOR_COLUMN_NAME: [ + [[1, 2], [3, 4]], + [[5, 6], [7, 8]], + [[9, 10], [11, 12]], + ] + } + ), + pd.DataFrame( + { + # Single column pandas automatically converts `TENSOR_COLUMN_NAME` + # In UDFs + TENSOR_COLUMN_NAME: TensorArray(np.arange(1, 13).reshape((3, 2, 2))) + } + ), + ), + ( + ds_arrow_multi_column_format(), + pd.DataFrame( + { + "column_1": [2, 3, 4, 5], + "column_2": [2, -2, 2, -2], + } + ), + pd.DataFrame( + { + "column_1": [2, 3, 4, 5], + "column_2": [2, -2, 2, -2], + } + ), + ), + ], +) +def test_batch_mapper_arrow_data_format(ds_with_expected_pandas_numpy_df): + """Tests batch mapper functionality for arrow data format. + + Note: + For single column pandas dataframes, we automatically convert it to + single column tensor with column name as `__value__`. + """ + ds, expected_df, expected_numpy_df = ds_with_expected_pandas_numpy_df + + def add_and_modify_udf_pandas(df: "pd.DataFrame"): + col_name = "column_1" + if len(df.columns) == 1: + col_name = list(df.columns)[0] + df[col_name] = df[col_name] + 1 + if "column_2" in df: + df["column_2"] *= 2 + return df + + def add_and_modify_udf_numpy(data: Union[np.ndarray, Dict[str, np.ndarray]]): + if isinstance(data, np.ndarray): + data = data + 1 + else: + data["column_1"] = data["column_1"] + 1 + if "column_2" in data: + data["column_2"] = data["column_2"] * 2 + return data + + # Test map_batches + transformed_ds = ds.map_batches(add_and_modify_udf_pandas, batch_format="pandas") + out_df_map_batches = transformed_ds.to_pandas() + assert_frame_equal(out_df_map_batches, expected_df) + + transformed_ds = ds.map_batches(add_and_modify_udf_numpy, batch_format="numpy") + out_df_map_batches = transformed_ds.to_pandas() + assert_frame_equal(out_df_map_batches, expected_numpy_df) + + # Test BatchMapper + batch_mapper = BatchMapper(fn=add_and_modify_udf_pandas, batch_format="pandas") + batch_mapper.fit(ds) + transformed_ds = batch_mapper.transform(ds) + out_df = transformed_ds.to_pandas() + assert_frame_equal(out_df, expected_df) + + batch_mapper = BatchMapper(fn=add_and_modify_udf_numpy, batch_format="numpy") + batch_mapper.fit(ds) + transformed_ds = batch_mapper.transform(ds) + out_df = transformed_ds.to_pandas() + assert_frame_equal(out_df, expected_numpy_df) + + +@pytest.mark.parametrize( + "ds_with_expected_pandas_numpy_df", + [ + ( + ds_numpy_single_column_tensor_format(), + pd.DataFrame( + { + # Single column pandas automatically converts `TENSOR_COLUMN_NAME` + # In UDFs + TENSOR_COLUMN_NAME: [ + [[1, 2], [3, 4]], + [[5, 6], [7, 8]], + [[9, 10], [11, 12]], + ] + } + ), + pd.DataFrame( + { + # Single column pandas automatically converts `TENSOR_COLUMN_NAME` + # In UDFs + TENSOR_COLUMN_NAME: TensorArray(np.arange(1, 13).reshape((3, 2, 2))) + } + ), + ), + ( + ds_numpy_list_of_ndarray_tensor_format(), + pd.DataFrame( + { + # Single column pandas automatically converts `TENSOR_COLUMN_NAME` + # In UDFs + TENSOR_COLUMN_NAME: [ + [[1, 2], [3, 4]], + [[5, 6], [7, 8]], + [[9, 10], [11, 12]], + [[13, 14], [15, 16]], + [[17, 18], [19, 20]], + [[21, 22], [23, 24]], + ] + } + ), + pd.DataFrame( + { + # Single column pandas automatically converts `TENSOR_COLUMN_NAME` + # In UDFs + TENSOR_COLUMN_NAME: TensorArray(np.arange(1, 25).reshape((6, 2, 2))) + } + ), + ), + ], +) +def test_batch_mapper_numpy_data_format(ds_with_expected_pandas_numpy_df): + """Tests batch mapper functionality for numpy data format. + + Note: + For single column pandas dataframes, we automatically convert it to + single column tensor with column name as `__value__`. + """ + ds, expected_df, expected_numpy_df = ds_with_expected_pandas_numpy_df + + def add_and_modify_udf_pandas(df: "pd.DataFrame"): + col_name = list(df.columns)[0] + df[col_name] = df[col_name] + 1 + return df + + def add_and_modify_udf_numpy(data: Union[np.ndarray, Dict[str, np.ndarray]]): + data = data + 1 + return data + + # Test map_batches + transformed_ds = ds.map_batches(add_and_modify_udf_pandas, batch_format="pandas") + out_df_map_batches = transformed_ds.to_pandas() + assert_frame_equal(out_df_map_batches, expected_df) + + transformed_ds = ds.map_batches(add_and_modify_udf_numpy, batch_format="numpy") + out_df_map_batches = transformed_ds.to_pandas() + assert_frame_equal(out_df_map_batches, expected_numpy_df) + + # Test BatchMapper + batch_mapper = BatchMapper(fn=add_and_modify_udf_pandas, batch_format="pandas") + batch_mapper.fit(ds) + transformed_ds = batch_mapper.transform(ds) + out_df = transformed_ds.to_pandas() + assert_frame_equal(out_df, expected_df) + + batch_mapper = BatchMapper(fn=add_and_modify_udf_numpy, batch_format="numpy") + batch_mapper.fit(ds) + transformed_ds = batch_mapper.transform(ds) + out_df = transformed_ds.to_pandas() + assert_frame_equal(out_df, expected_numpy_df) + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/data/tests/test_preprocessors.py b/python/ray/data/tests/test_preprocessors.py index 1818aa80cb87c..d10d2f141fe1b 100644 --- a/python/ray/data/tests/test_preprocessors.py +++ b/python/ray/data/tests/test_preprocessors.py @@ -2,6 +2,7 @@ from collections import Counter import re from unittest.mock import patch +from typing import Dict, Union import numpy as np import pandas as pd @@ -44,22 +45,28 @@ class DummyPreprocessorWithPandas(DummyPreprocessorWithNothing): def _transform_pandas(self, df: "pd.DataFrame") -> "pd.DataFrame": return df - class DummyPreprocessorWithArrow(DummyPreprocessorWithNothing): - def _transform_arrow(self, table: "pyarrow.Table") -> "pyarrow.Table": - return table + class DummyPreprocessorWithNumpy(DummyPreprocessorWithNothing): + batch_format = "numpy" - class DummyPreprocessorWithPandasAndArrow(DummyPreprocessorWithNothing): + def _transform_numpy( + self, np_data: Union[np.ndarray, Dict[str, np.ndarray]] + ) -> Union[np.ndarray, Dict[str, np.ndarray]]: + return np_data + + class DummyPreprocessorWithPandasAndNumpy(DummyPreprocessorWithNothing): def _transform_pandas(self, df: "pd.DataFrame") -> "pd.DataFrame": return df - def _transform_arrow(self, table: "pyarrow.Table") -> "pyarrow.Table": - return table + def _transform_numpy( + self, np_data: Union[np.ndarray, Dict[str, np.ndarray]] + ) -> Union[np.ndarray, Dict[str, np.ndarray]]: + return np_data yield ( DummyPreprocessorWithNothing(), DummyPreprocessorWithPandas(), - DummyPreprocessorWithArrow(), - DummyPreprocessorWithPandasAndArrow(), + DummyPreprocessorWithNumpy(), + DummyPreprocessorWithPandasAndNumpy(), ) @@ -1132,36 +1139,6 @@ def test_normalizer(): assert out_df.equals(expected_df) -def test_batch_mapper(): - """Tests batch mapper functionality.""" - old_column = [1, 2, 3, 4] - to_be_modified = [1, -1, 1, -1] - in_df = pd.DataFrame.from_dict( - {"old_column": old_column, "to_be_modified": to_be_modified} - ) - ds = ray.data.from_pandas(in_df) - - def add_and_modify_udf(df: "pd.DataFrame"): - df["new_col"] = df["old_column"] + 1 - df["to_be_modified"] *= 2 - return df - - batch_mapper = BatchMapper(fn=add_and_modify_udf) - batch_mapper.fit(ds) - transformed = batch_mapper.transform(ds) - out_df = transformed.to_pandas() - - expected_df = pd.DataFrame.from_dict( - { - "old_column": old_column, - "to_be_modified": [2, -2, 2, -2], - "new_col": [2, 3, 4, 5], - } - ) - - assert out_df.equals(expected_df) - - def test_power_transformer(): """Tests basic PowerTransformer functionality.""" @@ -1592,13 +1569,13 @@ def test_simple_hash(): assert simple_hash([1, 2, "apple"], 100) == 37 -def test_arrow_pandas_support_simple_dataset(create_dummy_preprocessors): +def test_numpy_pandas_support_simple_dataset(create_dummy_preprocessors): # Case 1: simple dataset. No support ( with_nothing, with_pandas, - with_arrow, - with_pandas_and_arrow, + with_numpy, + with_pandas_and_numpy, ) = create_dummy_preprocessors ds = ray.data.range(10) @@ -1609,19 +1586,19 @@ def test_arrow_pandas_support_simple_dataset(create_dummy_preprocessors): with_pandas.transform(ds) with pytest.raises(ValueError): - with_arrow.transform(ds) + with_numpy.transform(ds) with pytest.raises(ValueError): - with_pandas_and_arrow.transform(ds) + with_pandas_and_numpy.transform(ds) -def test_arrow_pandas_support_pandas_dataset(create_dummy_preprocessors): +def test_numpy_pandas_support_pandas_dataset(create_dummy_preprocessors): # Case 2: pandas dataset ( with_nothing, with_pandas, - with_arrow, - with_pandas_and_arrow, + _, + with_pandas_and_numpy, ) = create_dummy_preprocessors df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=["A", "B", "C"]) @@ -1631,18 +1608,16 @@ def test_arrow_pandas_support_pandas_dataset(create_dummy_preprocessors): assert with_pandas.transform(ds)._dataset_format() == "pandas" - assert with_arrow.transform(ds)._dataset_format() == "arrow" - - assert with_pandas_and_arrow.transform(ds)._dataset_format() == "pandas" + assert with_pandas_and_numpy.transform(ds)._dataset_format() == "pandas" -def test_arrow_pandas_support_arrow_dataset(create_dummy_preprocessors): +def test_numpy_pandas_support_arrow_dataset(create_dummy_preprocessors): # Case 3: arrow dataset ( with_nothing, with_pandas, - with_arrow, - with_pandas_and_arrow, + with_numpy, + with_pandas_and_numpy, ) = create_dummy_preprocessors df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=["A", "B", "C"]) @@ -1652,18 +1627,19 @@ def test_arrow_pandas_support_arrow_dataset(create_dummy_preprocessors): assert with_pandas.transform(ds)._dataset_format() == "pandas" - assert with_arrow.transform(ds)._dataset_format() == "arrow" + assert with_numpy.transform(ds)._dataset_format() == "arrow" - assert with_pandas_and_arrow.transform(ds)._dataset_format() == "arrow" + # Auto select data_format = "arrow" -> batch_format = "numpy" for performance + assert with_pandas_and_numpy.transform(ds)._dataset_format() == "arrow" -def test_arrow_pandas_support_transform_batch_wrong_format(create_dummy_preprocessors): +def test_numpy_pandas_support_transform_batch_wrong_format(create_dummy_preprocessors): # Case 1: simple dataset. No support ( with_nothing, with_pandas, - with_arrow, - with_pandas_and_arrow, + with_numpy, + with_pandas_and_numpy, ) = create_dummy_preprocessors batch = [1, 2, 3] @@ -1674,51 +1650,105 @@ def test_arrow_pandas_support_transform_batch_wrong_format(create_dummy_preproce with_pandas.transform_batch(batch) with pytest.raises(NotImplementedError): - with_arrow.transform_batch(batch) + with_numpy.transform_batch(batch) with pytest.raises(NotImplementedError): - with_pandas_and_arrow.transform_batch(batch) + with_pandas_and_numpy.transform_batch(batch) -def test_arrow_pandas_support_transform_batch_pandas(create_dummy_preprocessors): +def test_numpy_pandas_support_transform_batch_pandas(create_dummy_preprocessors): # Case 2: pandas dataset ( with_nothing, with_pandas, - with_arrow, - with_pandas_and_arrow, + with_numpy, + with_pandas_and_numpy, ) = create_dummy_preprocessors df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=["A", "B", "C"]) + df_single_column = pd.DataFrame([1, 2, 3], columns=["A"]) with pytest.raises(NotImplementedError): with_nothing.transform_batch(df) + with pytest.raises(NotImplementedError): + with_nothing.transform_batch(df_single_column) assert isinstance(with_pandas.transform_batch(df), pd.DataFrame) + assert isinstance(with_pandas.transform_batch(df_single_column), pd.DataFrame) - assert isinstance(with_arrow.transform_batch(df), pyarrow.Table) + assert isinstance(with_numpy.transform_batch(df), (np.ndarray, dict)) + # We can get pd.DataFrame after returning numpy data from UDF + assert isinstance(with_numpy.transform_batch(df_single_column), (np.ndarray, dict)) - assert isinstance(with_pandas_and_arrow.transform_batch(df), pd.DataFrame) + assert isinstance(with_pandas_and_numpy.transform_batch(df), pd.DataFrame) + assert isinstance( + with_pandas_and_numpy.transform_batch(df_single_column), pd.DataFrame + ) -def test_arrow_pandas_support_transform_batch_arrow(create_dummy_preprocessors): +def test_numpy_pandas_support_transform_batch_arrow(create_dummy_preprocessors): # Case 3: arrow dataset ( with_nothing, with_pandas, - with_arrow, - with_pandas_and_arrow, + with_numpy, + with_pandas_and_numpy, ) = create_dummy_preprocessors df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=["A", "B", "C"]) + df_single_column = pd.DataFrame([1, 2, 3], columns=["A"]) + table = pyarrow.Table.from_pandas(df) + table_single_column = pyarrow.Table.from_pandas(df_single_column) with pytest.raises(NotImplementedError): with_nothing.transform_batch(table) + with pytest.raises(NotImplementedError): + with_nothing.transform_batch(table_single_column) assert isinstance(with_pandas.transform_batch(table), pd.DataFrame) + assert isinstance(with_pandas.transform_batch(table_single_column), pd.DataFrame) + + assert isinstance(with_numpy.transform_batch(table), (np.ndarray, dict)) + # We can get pyarrow.Table after returning numpy data from UDF + assert isinstance( + with_numpy.transform_batch(table_single_column), (np.ndarray, dict) + ) + # Auto select data_format = "arrow" -> batch_format = "numpy" for performance + assert isinstance(with_pandas_and_numpy.transform_batch(table), (np.ndarray, dict)) + # We can get pyarrow.Table after returning numpy data from UDF + assert isinstance( + with_pandas_and_numpy.transform_batch(table_single_column), (np.ndarray, dict) + ) + + +def test_numpy_pandas_support_transform_batch_tensor(create_dummy_preprocessors): + # Case 4: tensor dataset created by from numpy data directly + ( + with_nothing, + _, + with_numpy, + with_pandas_and_numpy, + ) = create_dummy_preprocessors + np_data = np.arange(12).reshape(3, 2, 2) + np_single_column = {"A": np.arange(12).reshape(3, 2, 2)} + np_multi_column = { + "A": np.arange(12).reshape(3, 2, 2), + "B": np.arange(12, 24).reshape(3, 2, 2), + } + + with pytest.raises(NotImplementedError): + with_nothing.transform_batch(np_data) + with pytest.raises(NotImplementedError): + with_nothing.transform_batch(np_single_column) + with pytest.raises(NotImplementedError): + with_nothing.transform_batch(np_multi_column) - assert isinstance(with_arrow.transform_batch(table), pyarrow.Table) + assert isinstance(with_numpy.transform_batch(np_data), np.ndarray) + assert isinstance(with_numpy.transform_batch(np_single_column), dict) + assert isinstance(with_numpy.transform_batch(np_multi_column), dict) - assert isinstance(with_pandas_and_arrow.transform_batch(table), pyarrow.Table) + assert isinstance(with_pandas_and_numpy.transform_batch(np_data), np.ndarray) + assert isinstance(with_pandas_and_numpy.transform_batch(np_single_column), dict) + assert isinstance(with_pandas_and_numpy.transform_batch(np_multi_column), dict) if __name__ == "__main__":