diff --git a/python/ray/air/_internal/remote_storage.py b/python/ray/air/_internal/remote_storage.py index 1762c2881b6a..ca90fa2023ef 100644 --- a/python/ray/air/_internal/remote_storage.py +++ b/python/ray/air/_internal/remote_storage.py @@ -1,7 +1,7 @@ import fnmatch import os +from packaging import version import urllib.parse -from pkg_resources import packaging from typing import List, Optional, Tuple from ray.air._internal.filelock import TempFileLock @@ -119,10 +119,7 @@ def get_fs_and_path( try: import gcsfs - # For minimal install that only needs python3-setuptools - if packaging.version.parse(gcsfs.__version__) > packaging.version.parse( - "2022.7.1" - ): + if version.parse(gcsfs.__version__) > version.parse("2022.7.1"): raise RuntimeError( "`gcsfs` versions greater than '2022.7.1' are not " f"compatible with pyarrow. You have gcsfs version " diff --git a/python/ray/air/result.py b/python/ray/air/result.py index 002a80b770f0..357bc14d39b2 100644 --- a/python/ray/air/result.py +++ b/python/ray/air/result.py @@ -1,4 +1,3 @@ -from typing import TYPE_CHECKING from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple @@ -6,8 +5,7 @@ from ray.air.checkpoint import Checkpoint from ray.util.annotations import PublicAPI -if TYPE_CHECKING: - import pandas as pd +import pandas as pd @dataclass @@ -42,7 +40,7 @@ class Result: checkpoint: Optional[Checkpoint] error: Optional[Exception] log_dir: Optional[Path] - metrics_dataframe: Optional["pd.DataFrame"] + metrics_dataframe: Optional[pd.DataFrame] best_checkpoints: Optional[List[Tuple[Checkpoint, Dict[str, Any]]]] _items_to_repr = ["metrics", "error", "log_dir"] diff --git a/python/ray/air/tests/test_data_batch_conversion.py b/python/ray/air/tests/test_data_batch_conversion.py index d7e2307a23db..d88533809b1d 100644 --- a/python/ray/air/tests/test_data_batch_conversion.py +++ b/python/ray/air/tests/test_data_batch_conversion.py @@ -5,11 +5,8 @@ import pyarrow as pa from ray.air.constants import TENSOR_COLUMN_NAME -from ray.air.util.data_batch_conversion import ( - convert_batch_type_to_pandas, - convert_pandas_to_batch_type, - _convert_batch_type_to_numpy, -) +from ray.air.util.data_batch_conversion import convert_batch_type_to_pandas +from ray.air.util.data_batch_conversion import convert_pandas_to_batch_type from ray.air.util.data_batch_conversion import DataType from ray.air.util.tensor_extensions.pandas import TensorArray from ray.air.util.tensor_extensions.arrow import ArrowTensorArray @@ -25,100 +22,6 @@ def test_pandas_pandas(): pd.testing.assert_frame_equal(actual_output, input_data) -def test_numpy_to_numpy(): - input_data = {"x": np.arange(12).reshape(3, 4)} - expected_output = input_data - actual_output = _convert_batch_type_to_numpy(input_data) - assert expected_output == actual_output - - input_data = { - "column_1": np.arange(12).reshape(3, 4), - "column_2": np.arange(12).reshape(3, 4), - } - expected_output = { - "column_1": np.arange(12).reshape(3, 4), - "column_2": np.arange(12).reshape(3, 4), - } - actual_output = _convert_batch_type_to_numpy(input_data) - assert input_data.keys() == expected_output.keys() - np.testing.assert_array_equal(input_data["column_1"], expected_output["column_1"]) - np.testing.assert_array_equal(input_data["column_2"], expected_output["column_2"]) - - input_data = np.arange(12).reshape(3, 4) - expected_output = input_data - actual_output = _convert_batch_type_to_numpy(input_data) - np.testing.assert_array_equal(expected_output, actual_output) - - -def test_arrow_to_numpy(): - input_data = pa.table({"column_1": [1, 2, 3, 4]}) - expected_output = {"column_1": np.array([1, 2, 3, 4])} - actual_output = _convert_batch_type_to_numpy(input_data) - assert expected_output.keys() == actual_output.keys() - np.testing.assert_array_equal( - expected_output["column_1"], actual_output["column_1"] - ) - - input_data = pa.table( - { - TENSOR_COLUMN_NAME: ArrowTensorArray.from_numpy( - np.arange(12).reshape(3, 2, 2) - ) - } - ) - expected_output = np.arange(12).reshape(3, 2, 2) - actual_output = _convert_batch_type_to_numpy(input_data) - np.testing.assert_array_equal(expected_output, actual_output) - - input_data = pa.table( - { - "column_1": [1, 2, 3, 4], - "column_2": [1, -1, 1, -1], - } - ) - expected_output = { - "column_1": np.array([1, 2, 3, 4]), - "column_2": np.array([1, -1, 1, -1]), - } - - actual_output = _convert_batch_type_to_numpy(input_data) - assert expected_output.keys() == actual_output.keys() - np.testing.assert_array_equal( - expected_output["column_1"], actual_output["column_1"] - ) - np.testing.assert_array_equal( - expected_output["column_2"], actual_output["column_2"] - ) - - -def test_pd_dataframe_to_numpy(): - input_data = pd.DataFrame({"column_1": [1, 2, 3, 4]}) - expected_output = np.array([1, 2, 3, 4]) - actual_output = _convert_batch_type_to_numpy(input_data) - np.testing.assert_array_equal(expected_output, actual_output) - - input_data = pd.DataFrame( - {TENSOR_COLUMN_NAME: TensorArray(np.arange(12).reshape(3, 4))} - ) - expected_output = np.arange(12).reshape(3, 4) - actual_output = _convert_batch_type_to_numpy(input_data) - np.testing.assert_array_equal(expected_output, actual_output) - - input_data = pd.DataFrame({"column_1": [1, 2, 3, 4], "column_2": [1, -1, 1, -1]}) - expected_output = { - "column_1": np.array([1, 2, 3, 4]), - "column_2": np.array([1, -1, 1, -1]), - } - actual_output = _convert_batch_type_to_numpy(input_data) - assert expected_output.keys() == actual_output.keys() - np.testing.assert_array_equal( - expected_output["column_1"], actual_output["column_1"] - ) - np.testing.assert_array_equal( - expected_output["column_2"], actual_output["column_2"] - ) - - @pytest.mark.parametrize("use_tensor_extension_for_input", [True, False]) @pytest.mark.parametrize("cast_tensor_columns", [True, False]) def test_pandas_multi_dim_pandas(cast_tensor_columns, use_tensor_extension_for_input): diff --git a/python/ray/air/tests/test_dataset_config.py b/python/ray/air/tests/test_dataset_config.py index 931d2ed34798..e56a0f8e3c0a 100644 --- a/python/ray/air/tests/test_dataset_config.py +++ b/python/ray/air/tests/test_dataset_config.py @@ -162,15 +162,11 @@ def test_use_stream_api_config(ray_start_4_cpus): def test_fit_transform_config(ray_start_4_cpus): ds = ray.data.range_table(10) - def drop_odd_pandas(rows): + def drop_odd(rows): key = list(rows)[0] return rows[(rows[key] % 2 == 0)] - def drop_odd_numpy(rows): - return [x for x in rows if x % 2 == 0] - - prep_pandas = BatchMapper(drop_odd_pandas, batch_format="pandas") - prep_numpy = BatchMapper(drop_odd_numpy, batch_format="numpy") + prep = BatchMapper(drop_odd) # Single worker basic case. test = TestBasic( @@ -179,18 +175,7 @@ def drop_odd_numpy(rows): {"train": 5, "test": 5}, dataset_config={}, datasets={"train": ds, "test": ds}, - preprocessor=prep_pandas, - ) - test.fit() - - # Single worker basic case. - test = TestBasic( - 1, - True, - {"train": 5, "test": 5}, - dataset_config={}, - datasets={"train": ds, "test": ds}, - preprocessor=prep_numpy, + preprocessor=prep, ) test.fit() @@ -201,7 +186,7 @@ def drop_odd_numpy(rows): {"train": 5, "test": 10}, dataset_config={"test": DatasetConfig(transform=False)}, datasets={"train": ds, "test": ds}, - preprocessor=prep_pandas, + preprocessor=prep, ) test.fit() diff --git a/python/ray/air/util/data_batch_conversion.py b/python/ray/air/util/data_batch_conversion.py index c640faab69e3..f9eb1888d581 100644 --- a/python/ray/air/util/data_batch_conversion.py +++ b/python/ray/air/util/data_batch_conversion.py @@ -1,5 +1,5 @@ from enum import Enum, auto -from typing import Dict, Union, List +from typing import Union, List import numpy as np import pandas as pd @@ -7,13 +7,6 @@ from ray.air.data_batch_type import DataBatchType from ray.air.constants import TENSOR_COLUMN_NAME from ray.util.annotations import DeveloperAPI -from ray.air.util.tensor_extensions.arrow import ArrowTensorType - -# TODO: Consolidate data conversion edges for arrow bug workaround. -from ray.air.util.transform_pyarrow import ( - _is_column_extension_type, - _concatenate_extension_column, -) try: import pyarrow @@ -116,62 +109,6 @@ def convert_pandas_to_batch_type( ) -def _convert_batch_type_to_numpy( - data: DataBatchType, -) -> Union[np.ndarray, Dict[str, np.ndarray]]: - """Convert the provided data to a NumPy ndarray or dict of ndarrays. - - Args: - data: Data of type DataBatchType - - Returns: - A numpy representation of the input data. - """ - if isinstance(data, np.ndarray): - return data - elif isinstance(data, dict): - for col_name, col in data.items(): - if not isinstance(col, np.ndarray): - raise ValueError( - "All values in the provided dict must be of type " - f"np.ndarray. Found type {type(col)} for key {col_name} " - f"instead." - ) - return data - elif pyarrow is not None and isinstance(data, pyarrow.Table): - if data.column_names == [TENSOR_COLUMN_NAME] and ( - isinstance(data.schema.types[0], ArrowTensorType) - ): - # If representing a tensor dataset, return as a single numpy array. - # Example: ray.data.from_numpy(np.arange(12).reshape((3, 2, 2))) - # Arrow’s incorrect concatenation of extension arrays: - # https://issues.apache.org/jira/browse/ARROW-16503 - return _concatenate_extension_column(data[TENSOR_COLUMN_NAME]).to_numpy( - zero_copy_only=False - ) - else: - output_dict = {} - for col_name in data.column_names: - col = data[col_name] - if col.num_chunks == 0: - col = pyarrow.array([], type=col.type) - elif _is_column_extension_type(col): - # Arrow’s incorrect concatenation of extension arrays: - # https://issues.apache.org/jira/browse/ARROW-16503 - col = _concatenate_extension_column(col) - else: - col = col.combine_chunks() - output_dict[col_name] = col.to_numpy(zero_copy_only=False) - return output_dict - elif isinstance(data, pd.DataFrame): - return convert_pandas_to_batch_type(data, DataType.NUMPY) - else: - raise ValueError( - f"Received data of type: {type(data)}, but expected it to be one " - f"of {DataBatchType}" - ) - - def _ndarray_to_column(arr: np.ndarray) -> Union[pd.Series, List[np.ndarray]]: """Convert a NumPy ndarray into an appropriate column format for insertion into a pandas DataFrame. diff --git a/python/ray/air/util/transform_pyarrow.py b/python/ray/air/util/transform_pyarrow.py deleted file mode 100644 index a93e2963d628..000000000000 --- a/python/ray/air/util/transform_pyarrow.py +++ /dev/null @@ -1,31 +0,0 @@ -try: - import pyarrow -except ImportError: - pyarrow = None - - -def _is_column_extension_type(ca: "pyarrow.ChunkedArray") -> bool: - """Whether the provided Arrow Table column is an extension array, using an Arrow - extension type. - """ - return isinstance(ca.type, pyarrow.ExtensionType) - - -def _concatenate_extension_column(ca: "pyarrow.ChunkedArray") -> "pyarrow.Array": - """Concatenate chunks of an extension column into a contiguous array. - - This concatenation is required for creating copies and for .take() to work on - extension arrays. - See https://issues.apache.org/jira/browse/ARROW-16503. - """ - if not _is_column_extension_type(ca): - raise ValueError("Chunked array isn't an extension array: {ca}") - - if ca.num_chunks == 0: - # No-op for no-chunk chunked arrays, since there's nothing to concatenate. - return ca - - chunk = ca.chunk(0) - return type(chunk).from_storage( - chunk.type, pyarrow.concat_arrays([c.storage for c in ca.chunks]) - ) diff --git a/python/ray/data/_internal/arrow_block.py b/python/ray/data/_internal/arrow_block.py index 0f3e04bf0fcd..648fa0b302c8 100644 --- a/python/ray/data/_internal/arrow_block.py +++ b/python/ray/data/_internal/arrow_block.py @@ -16,11 +16,11 @@ import numpy as np -from ray.air.util.transform_pyarrow import ( +from ray.data._internal.arrow_ops import transform_polars, transform_pyarrow +from ray.data._internal.arrow_ops.transform_pyarrow import ( _concatenate_extension_column, _is_column_extension_type, ) -from ray.data._internal.arrow_ops import transform_polars, transform_pyarrow from ray.data._internal.table_block import ( VALUE_COL_NAME, TableBlockAccessor, diff --git a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py index 08656e4ac232..83f12a55556b 100644 --- a/python/ray/data/_internal/arrow_ops/transform_pyarrow.py +++ b/python/ray/data/_internal/arrow_ops/transform_pyarrow.py @@ -1,10 +1,5 @@ from typing import TYPE_CHECKING, List, Union -from ray.air.util.transform_pyarrow import ( - _is_column_extension_type, - _concatenate_extension_column, -) - try: import pyarrow except ImportError: @@ -21,6 +16,33 @@ def sort(table: "pyarrow.Table", key: "SortKeyT", descending: bool) -> "pyarrow. return table.take(indices) +def _is_column_extension_type(ca: "pyarrow.ChunkedArray") -> bool: + """Whether the provided Arrow Table column is an extension array, using an Arrow + extension type. + """ + return isinstance(ca.type, pyarrow.ExtensionType) + + +def _concatenate_extension_column(ca: "pyarrow.ChunkedArray") -> "pyarrow.Array": + """Concatenate chunks of an extension column into a contiguous array. + + This concatenation is required for creating copies and for .take() to work on + extension arrays. + See https://issues.apache.org/jira/browse/ARROW-16503. + """ + if not _is_column_extension_type(ca): + raise ValueError("Chunked array isn't an extension array: {ca}") + + if ca.num_chunks == 0: + # No-op for no-chunk chunked arrays, since there's nothing to concatenate. + return ca + + chunk = ca.chunk(0) + return type(chunk).from_storage( + chunk.type, pyarrow.concat_arrays([c.storage for c in ca.chunks]) + ) + + def take_table( table: "pyarrow.Table", indices: Union[List[int], "pyarrow.Array", "pyarrow.ChunkedArray"], diff --git a/python/ray/data/preprocessor.py b/python/ray/data/preprocessor.py index cc5f59bc2c20..55f8c2145796 100644 --- a/python/ray/data/preprocessor.py +++ b/python/ray/data/preprocessor.py @@ -1,15 +1,14 @@ import abc import warnings from enum import Enum -from typing import TYPE_CHECKING, Optional, Union, Dict - -import numpy as np +from typing import TYPE_CHECKING, Optional from ray.data import Dataset from ray.util.annotations import DeveloperAPI, PublicAPI if TYPE_CHECKING: import pandas as pd + import pyarrow from ray.air.data_batch_type import DataBatchType @@ -39,7 +38,7 @@ class Preprocessor(abc.ABC): * ``_fit`` if your preprocessor is stateful. Otherwise, set ``_is_fittable=False``. - * ``_transform_pandas`` and/or ``_transform_numpy`` for best performance, + * ``_transform_pandas`` and/or ``_transform_arrow`` for best performance, implement both. Otherwise, the data will be converted to the match the implemented method. """ @@ -146,14 +145,14 @@ def transform(self, dataset: Dataset) -> Dataset: self._transform_stats = transformed_ds.stats() return transformed_ds - def transform_batch(self, data: "DataBatchType") -> "DataBatchType": + def transform_batch(self, df: "DataBatchType") -> "DataBatchType": """Transform a single batch of data. The data will be converted to the format supported by the Preprocessor, based on which ``_transform_*`` method(s) are implemented. Args: - data: Input data batch. + df: Input data batch. Returns: DataBatchType: @@ -169,7 +168,7 @@ def transform_batch(self, data: "DataBatchType") -> "DataBatchType": raise PreprocessorNotFittedException( "`fit` must be called before `transform_batch`." ) - return self._transform_batch(data) + return self._transform_batch(df) def _check_is_fitted(self) -> bool: """Returns whether this preprocessor is fitted. @@ -188,50 +187,36 @@ def _fit(self, dataset: Dataset) -> "Preprocessor": def _determine_transform_to_use(self, data_format: str) -> str: """Determine which transform to use based on data format and implementation. - We will infer and pick the best transform to use: - * ``pandas`` data format prioritizes ``pandas`` transform if available. - * ``arrow`` and ``numpy`` data format prioritizes ``numpy`` transform if available. # noqa: E501 - * Fall back to what's available if no preferred path found. + * If only _transform_arrow is implemented, will convert the data to arrow. + * If only _transform_pandas is implemented, will convert the data to pandas. + If both are implemented, will pick the method corresponding to the format + for best performance. + * Implementation is defined as overriding the method in a sub-class. """ - assert data_format in ( - "pandas", - "arrow", - "numpy", - ), f"Unsupported data format: {data_format}" - + assert data_format in ("pandas", "arrow") + has_transform_arrow = ( + self.__class__._transform_arrow != Preprocessor._transform_arrow + ) has_transform_pandas = ( self.__class__._transform_pandas != Preprocessor._transform_pandas ) - has_transform_numpy = ( - self.__class__._transform_numpy != Preprocessor._transform_numpy - ) - # Infer transform type by prioritizing native transformation to minimize - # data conversion cost. - if data_format == "pandas": - # Perform native pandas transformation if possible. - if has_transform_pandas: + if has_transform_arrow and has_transform_pandas: + # Both transforms available, so we delegate based on the dataset format to + # ensure minimal data conversions. + if data_format == "pandas": transform_type = "pandas" - elif has_transform_numpy: - transform_type = "numpy" - else: - raise NotImplementedError( - "None of `_transform_numpy` or `_transform_pandas` " - f"are implemented for dataset format `{data_format}`." - ) - elif data_format == "arrow" or data_format == "numpy": - # Arrow -> Numpy is more efficient - if has_transform_numpy: - transform_type = "numpy" - elif has_transform_pandas: - transform_type = "pandas" - else: - raise NotImplementedError( - "None of `_transform_numpy` or `_transform_pandas` " - f"are implemented for dataset format `{data_format}`." - ) - + elif data_format == "arrow": + transform_type = "arrow" + elif has_transform_pandas: + transform_type = "pandas" + elif has_transform_arrow: + transform_type = "arrow" + else: + raise NotImplementedError( + "Neither `_transform_arrow` nor `_transform_pandas` are implemented." + ) return transform_type def _transform(self, dataset: Dataset) -> Dataset: @@ -241,56 +226,52 @@ def _transform(self, dataset: Dataset) -> Dataset: dataset_format = dataset._dataset_format() if dataset_format not in ("pandas", "arrow"): raise ValueError( - f"Unsupported Dataset format: '{dataset_format}'. Only 'pandas' " - "and 'arrow' Dataset formats are supported." + f"Unsupported Dataset format: '{dataset_format}'. Only 'pandas' and " + "'arrow' Dataset formats are supported." ) transform_type = self._determine_transform_to_use(dataset_format) - # Our user facing batch format should only be pandas or numpy, other - # formats {arrow, simple} are internal. if transform_type == "pandas": return dataset.map_batches(self._transform_pandas, batch_format="pandas") - elif transform_type == "numpy": - return dataset.map_batches(self._transform_numpy, batch_format="numpy") + elif transform_type == "arrow": + return dataset.map_batches(self._transform_arrow, batch_format="pyarrow") else: raise ValueError( "Invalid transform type returned from _determine_transform_to_use; " - f'"pandas" and "numpy" allowed, but got: {transform_type}' + f'"pandas" and "arrow" allowed, but got: {transform_type}' ) - def _transform_batch(self, data: "DataBatchType") -> "DataBatchType": - # For minimal install to locally import air modules + def _transform_batch(self, df: "DataBatchType") -> "DataBatchType": import pandas as pd - from ray.air.util.data_batch_conversion import ( - convert_batch_type_to_pandas, - _convert_batch_type_to_numpy, - ) try: import pyarrow except ImportError: pyarrow = None - if isinstance(data, pd.DataFrame): + if isinstance(df, pd.DataFrame): data_format = "pandas" - elif pyarrow is not None and isinstance(data, pyarrow.Table): + elif pyarrow is not None and isinstance(df, pyarrow.Table): data_format = "arrow" - elif isinstance(data, (dict, np.ndarray)): - data_format = "numpy" else: raise NotImplementedError( - "`transform_batch` is currently only implemented for Pandas " - "DataFrames, pyarrow Tables, NumPy ndarray and dictionary of " - f"ndarray. Got {type(data)}." + "`transform_batch` is currently only implemented for Pandas DataFrames " + f"and PyArrow Tables. Got {type(df)}." ) transform_type = self._determine_transform_to_use(data_format) if transform_type == "pandas": - return self._transform_pandas(convert_batch_type_to_pandas(data)) - elif transform_type == "numpy": - return self._transform_numpy(_convert_batch_type_to_numpy(data)) + if data_format == "pandas": + return self._transform_pandas(df) + else: + return self._transform_pandas(df.to_pandas()) + elif transform_type == "arrow": + if data_format == "arrow": + return self._transform_arrow(df) + else: + return self._transform_arrow(pyarrow.Table.from_pandas(df)) @DeveloperAPI def _transform_pandas(self, df: "pd.DataFrame") -> "pd.DataFrame": @@ -298,8 +279,6 @@ def _transform_pandas(self, df: "pd.DataFrame") -> "pd.DataFrame": raise NotImplementedError() @DeveloperAPI - def _transform_numpy( - self, np_data: Union[np.ndarray, Dict[str, np.ndarray]] - ) -> Union[np.ndarray, Dict[str, np.ndarray]]: - """Run the transformation on a data batch in a NumPy ndarray format.""" + def _transform_arrow(self, table: "pyarrow.Table") -> "pyarrow.Table": + """Run the transformation on a data batch in a PyArrow Table format.""" raise NotImplementedError() diff --git a/python/ray/data/preprocessors/batch_mapper.py b/python/ray/data/preprocessors/batch_mapper.py index 55b1f176d390..90ad42771849 100644 --- a/python/ray/data/preprocessors/batch_mapper.py +++ b/python/ray/data/preprocessors/batch_mapper.py @@ -1,7 +1,4 @@ -from typing import Dict, Callable, Optional, Union, TYPE_CHECKING -import warnings - -import numpy as np +from typing import Callable, TYPE_CHECKING from ray.data.preprocessor import Preprocessor @@ -27,8 +24,6 @@ class BatchMapper(Preprocessor): Use :class:`BatchMapper` to apply arbitrary operations like dropping a column. >>> import pandas as pd - >>> import numpy as np - >>> from typing import Dict >>> import ray >>> from ray.data.preprocessors import BatchMapper >>> @@ -41,65 +36,19 @@ class BatchMapper(Preprocessor): >>> preprocessor = BatchMapper(fn) >>> preprocessor.transform(ds) # doctest: +SKIP Dataset(num_blocks=1, num_rows=3, schema={X: int64}) - >>> - >>> def fn_numpy(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: - ... return {"X": batch["X"]} - >>> preprocessor = BatchMapper(fn_numpy, batch_format="numpy") - >>> preprocessor.transform(ds) # doctest: +SKIP - Dataset(num_blocks=1, num_rows=3, schema={X: int64}) Args: fn: The function to apply to data batches. - batch_format: The preferred batch format to use in UDF. If not given, - we will infer based on the input dataset data format. """ _is_fittable = False - def __init__( - self, - fn: Union[ - Callable[["pandas.DataFrame"], "pandas.DataFrame"], - Callable[ - [Union[np.ndarray, Dict[str, np.ndarray]]], - Union[np.ndarray, Dict[str, np.ndarray]], - ], - ], - batch_format: Optional[str] = None, - # TODO: Make batch_format required from user - # TODO: Introduce a "zero_copy" format - # TODO: We should reach consistency of args between BatchMapper and map_batches. - ): - if not batch_format: - warnings.warn( - "batch_format will be a required argument for BatchMapper in future " - "releases. Defaulting to 'pandas' batch format.", - DeprecationWarning, - ) - batch_format = "pandas" - if batch_format and batch_format not in [ - "pandas", - "numpy", - ]: - raise ValueError("BatchMapper only supports pandas and numpy batch format.") - - self.batch_format = batch_format + def __init__(self, fn: Callable[["pandas.DataFrame"], "pandas.DataFrame"]): self.fn = fn - def _transform_numpy( - self, np_data: Union[np.ndarray, Dict[str, np.ndarray]] - ) -> Union[np.ndarray, Dict[str, np.ndarray]]: - return self.fn(np_data) - def _transform_pandas(self, df: "pandas.DataFrame") -> "pandas.DataFrame": return self.fn(df) - def _determine_transform_to_use(self, data_format: str): - if self.batch_format: - return self.batch_format - else: - return super()._determine_transform_to_use(data_format) - def __repr__(self): fn_name = getattr(self.fn, "__name__", self.fn) return f"{self.__class__.__name__}(fn={fn_name})" diff --git a/python/ray/data/tests/test_batch_mapper.py b/python/ray/data/tests/test_batch_mapper.py deleted file mode 100644 index ce099caf7b75..000000000000 --- a/python/ray/data/tests/test_batch_mapper.py +++ /dev/null @@ -1,353 +0,0 @@ -import numpy as np -import pandas as pd -import pyarrow as pa - -import pytest -from typing import Dict, Union -from pandas.testing import assert_frame_equal - -import ray -from ray.data.preprocessors import BatchMapper -from ray.data.extensions import TensorArray -from ray.air.constants import TENSOR_COLUMN_NAME -from ray.air.util.tensor_extensions.arrow import ArrowTensorArray - - -# ===== Pandas dataset formats ===== -def ds_pandas_single_column_format(): - in_df = pd.DataFrame({"column_1": [1, 2, 3, 4]}) - ds = ray.data.from_pandas(in_df) - return ds - - -def ds_pandas_multi_column_format(): - in_df = pd.DataFrame({"column_1": [1, 2, 3, 4], "column_2": [1, -1, 1, -1]}) - ds = ray.data.from_pandas(in_df) - return ds - - -# ===== Arrow dataset formats ===== -def ds_arrow_single_column_format(): - ds = ray.data.from_arrow(pa.table({"column_1": [1, 2, 3, 4]})) - return ds - - -def ds_arrow_single_column_tensor_format(): - ds = ray.data.from_arrow( - pa.table( - { - TENSOR_COLUMN_NAME: ArrowTensorArray.from_numpy( - np.arange(12).reshape((3, 2, 2)) - ) - } - ) - ) - return ds - - -def ds_arrow_multi_column_format(): - ds = ray.data.from_arrow( - pa.table( - { - "column_1": [1, 2, 3, 4], - "column_2": [1, -1, 1, -1], - } - ) - ) - return ds - - -# ===== Numpy dataset formats ===== -def ds_numpy_single_column_tensor_format(): - ds = ray.data.from_numpy(np.arange(12).reshape((3, 2, 2))) - return ds - - -def ds_numpy_list_of_ndarray_tensor_format(): - ds = ray.data.from_numpy( - [np.arange(12).reshape((3, 2, 2)), np.arange(12, 24).reshape((3, 2, 2))] - ) - return ds - - -@pytest.mark.parametrize( - "ds_with_expected_pandas_numpy_df", - [ - ( - ds_pandas_single_column_format(), - pd.DataFrame( - { - "column_1": [2, 3, 4, 5], - } - ), - pd.DataFrame( - { - # Single column pandas automatically converts `TENSOR_COLUMN_NAME` - # In UDFs - TENSOR_COLUMN_NAME: [2, 3, 4, 5], - } - ), - ), - ( - ds_pandas_multi_column_format(), - pd.DataFrame( - { - "column_1": [2, 3, 4, 5], - "column_2": [2, -2, 2, -2], - } - ), - pd.DataFrame( - { - "column_1": [2, 3, 4, 5], - "column_2": [2, -2, 2, -2], - } - ), - ), - ], -) -def test_batch_mapper_pandas_data_format(ds_with_expected_pandas_numpy_df): - """Tests batch mapper functionality for pandas data format. - - Note: - For single column pandas dataframes, we automatically convert it to - single column tensor with column name as `__value__`. - """ - ds, expected_df, expected_numpy_df = ds_with_expected_pandas_numpy_df - - def add_and_modify_udf_pandas(df: "pd.DataFrame"): - df["column_1"] = df["column_1"] + 1 - if "column_2" in df: - df["column_2"] *= 2 - return df - - def add_and_modify_udf_numpy(data: Union[np.ndarray, Dict[str, np.ndarray]]): - if isinstance(data, np.ndarray): - data += 1 - else: - data["column_1"] = data["column_1"] + 1 - if "column_2" in data: - data["column_2"] *= 2 - return data - - # Test map_batches - transformed_ds = ds.map_batches(add_and_modify_udf_pandas, batch_format="pandas") - out_df_map_batches = transformed_ds.to_pandas() - assert_frame_equal(out_df_map_batches, expected_df) - - transformed_ds = ds.map_batches(add_and_modify_udf_numpy, batch_format="numpy") - out_df_map_batches = transformed_ds.to_pandas() - assert_frame_equal(out_df_map_batches, expected_numpy_df) - - # Test BatchMapper - batch_mapper = BatchMapper(fn=add_and_modify_udf_pandas, batch_format="pandas") - batch_mapper.fit(ds) - transformed_ds = batch_mapper.transform(ds) - out_df = transformed_ds.to_pandas() - assert_frame_equal(out_df, expected_df) - - batch_mapper = BatchMapper(fn=add_and_modify_udf_numpy, batch_format="numpy") - batch_mapper.fit(ds) - transformed_ds = batch_mapper.transform(ds) - out_df = transformed_ds.to_pandas() - assert_frame_equal(out_df, expected_numpy_df) - - -@pytest.mark.parametrize( - "ds_with_expected_pandas_numpy_df", - [ - ( - ds_arrow_single_column_format(), - pd.DataFrame( - { - "column_1": [2, 3, 4, 5], - } - ), - pd.DataFrame( - { - # Single column pandas automatically converts `TENSOR_COLUMN_NAME` - # In UDFs - TENSOR_COLUMN_NAME: [2, 3, 4, 5], - } - ), - ), - ( - ds_arrow_single_column_tensor_format(), - pd.DataFrame( - { - TENSOR_COLUMN_NAME: [ - [[1, 2], [3, 4]], - [[5, 6], [7, 8]], - [[9, 10], [11, 12]], - ] - } - ), - pd.DataFrame( - { - # Single column pandas automatically converts `TENSOR_COLUMN_NAME` - # In UDFs - TENSOR_COLUMN_NAME: TensorArray(np.arange(1, 13).reshape((3, 2, 2))) - } - ), - ), - ( - ds_arrow_multi_column_format(), - pd.DataFrame( - { - "column_1": [2, 3, 4, 5], - "column_2": [2, -2, 2, -2], - } - ), - pd.DataFrame( - { - "column_1": [2, 3, 4, 5], - "column_2": [2, -2, 2, -2], - } - ), - ), - ], -) -def test_batch_mapper_arrow_data_format(ds_with_expected_pandas_numpy_df): - """Tests batch mapper functionality for arrow data format. - - Note: - For single column pandas dataframes, we automatically convert it to - single column tensor with column name as `__value__`. - """ - ds, expected_df, expected_numpy_df = ds_with_expected_pandas_numpy_df - - def add_and_modify_udf_pandas(df: "pd.DataFrame"): - col_name = "column_1" - if len(df.columns) == 1: - col_name = list(df.columns)[0] - df[col_name] = df[col_name] + 1 - if "column_2" in df: - df["column_2"] *= 2 - return df - - def add_and_modify_udf_numpy(data: Union[np.ndarray, Dict[str, np.ndarray]]): - if isinstance(data, np.ndarray): - data = data + 1 - else: - data["column_1"] = data["column_1"] + 1 - if "column_2" in data: - data["column_2"] = data["column_2"] * 2 - return data - - # Test map_batches - transformed_ds = ds.map_batches(add_and_modify_udf_pandas, batch_format="pandas") - out_df_map_batches = transformed_ds.to_pandas() - assert_frame_equal(out_df_map_batches, expected_df) - - transformed_ds = ds.map_batches(add_and_modify_udf_numpy, batch_format="numpy") - out_df_map_batches = transformed_ds.to_pandas() - assert_frame_equal(out_df_map_batches, expected_numpy_df) - - # Test BatchMapper - batch_mapper = BatchMapper(fn=add_and_modify_udf_pandas, batch_format="pandas") - batch_mapper.fit(ds) - transformed_ds = batch_mapper.transform(ds) - out_df = transformed_ds.to_pandas() - assert_frame_equal(out_df, expected_df) - - batch_mapper = BatchMapper(fn=add_and_modify_udf_numpy, batch_format="numpy") - batch_mapper.fit(ds) - transformed_ds = batch_mapper.transform(ds) - out_df = transformed_ds.to_pandas() - assert_frame_equal(out_df, expected_numpy_df) - - -@pytest.mark.parametrize( - "ds_with_expected_pandas_numpy_df", - [ - ( - ds_numpy_single_column_tensor_format(), - pd.DataFrame( - { - # Single column pandas automatically converts `TENSOR_COLUMN_NAME` - # In UDFs - TENSOR_COLUMN_NAME: [ - [[1, 2], [3, 4]], - [[5, 6], [7, 8]], - [[9, 10], [11, 12]], - ] - } - ), - pd.DataFrame( - { - # Single column pandas automatically converts `TENSOR_COLUMN_NAME` - # In UDFs - TENSOR_COLUMN_NAME: TensorArray(np.arange(1, 13).reshape((3, 2, 2))) - } - ), - ), - ( - ds_numpy_list_of_ndarray_tensor_format(), - pd.DataFrame( - { - # Single column pandas automatically converts `TENSOR_COLUMN_NAME` - # In UDFs - TENSOR_COLUMN_NAME: [ - [[1, 2], [3, 4]], - [[5, 6], [7, 8]], - [[9, 10], [11, 12]], - [[13, 14], [15, 16]], - [[17, 18], [19, 20]], - [[21, 22], [23, 24]], - ] - } - ), - pd.DataFrame( - { - # Single column pandas automatically converts `TENSOR_COLUMN_NAME` - # In UDFs - TENSOR_COLUMN_NAME: TensorArray(np.arange(1, 25).reshape((6, 2, 2))) - } - ), - ), - ], -) -def test_batch_mapper_numpy_data_format(ds_with_expected_pandas_numpy_df): - """Tests batch mapper functionality for numpy data format. - - Note: - For single column pandas dataframes, we automatically convert it to - single column tensor with column name as `__value__`. - """ - ds, expected_df, expected_numpy_df = ds_with_expected_pandas_numpy_df - - def add_and_modify_udf_pandas(df: "pd.DataFrame"): - col_name = list(df.columns)[0] - df[col_name] = df[col_name] + 1 - return df - - def add_and_modify_udf_numpy(data: Union[np.ndarray, Dict[str, np.ndarray]]): - data = data + 1 - return data - - # Test map_batches - transformed_ds = ds.map_batches(add_and_modify_udf_pandas, batch_format="pandas") - out_df_map_batches = transformed_ds.to_pandas() - assert_frame_equal(out_df_map_batches, expected_df) - - transformed_ds = ds.map_batches(add_and_modify_udf_numpy, batch_format="numpy") - out_df_map_batches = transformed_ds.to_pandas() - assert_frame_equal(out_df_map_batches, expected_numpy_df) - - # Test BatchMapper - batch_mapper = BatchMapper(fn=add_and_modify_udf_pandas, batch_format="pandas") - batch_mapper.fit(ds) - transformed_ds = batch_mapper.transform(ds) - out_df = transformed_ds.to_pandas() - assert_frame_equal(out_df, expected_df) - - batch_mapper = BatchMapper(fn=add_and_modify_udf_numpy, batch_format="numpy") - batch_mapper.fit(ds) - transformed_ds = batch_mapper.transform(ds) - out_df = transformed_ds.to_pandas() - assert_frame_equal(out_df, expected_numpy_df) - - -if __name__ == "__main__": - import sys - - sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/data/tests/test_preprocessors.py b/python/ray/data/tests/test_preprocessors.py index 7ded8afbd83a..a3edb2b4ecb8 100644 --- a/python/ray/data/tests/test_preprocessors.py +++ b/python/ray/data/tests/test_preprocessors.py @@ -2,7 +2,6 @@ from collections import Counter import re from unittest.mock import patch -from typing import Dict, Union import numpy as np import pandas as pd @@ -44,28 +43,22 @@ class DummyPreprocessorWithPandas(DummyPreprocessorWithNothing): def _transform_pandas(self, df: "pd.DataFrame") -> "pd.DataFrame": return df - class DummyPreprocessorWithNumpy(DummyPreprocessorWithNothing): - batch_format = "numpy" + class DummyPreprocessorWithArrow(DummyPreprocessorWithNothing): + def _transform_arrow(self, table: "pyarrow.Table") -> "pyarrow.Table": + return table - def _transform_numpy( - self, np_data: Union[np.ndarray, Dict[str, np.ndarray]] - ) -> Union[np.ndarray, Dict[str, np.ndarray]]: - return np_data - - class DummyPreprocessorWithPandasAndNumpy(DummyPreprocessorWithNothing): + class DummyPreprocessorWithPandasAndArrow(DummyPreprocessorWithNothing): def _transform_pandas(self, df: "pd.DataFrame") -> "pd.DataFrame": return df - def _transform_numpy( - self, np_data: Union[np.ndarray, Dict[str, np.ndarray]] - ) -> Union[np.ndarray, Dict[str, np.ndarray]]: - return np_data + def _transform_arrow(self, table: "pyarrow.Table") -> "pyarrow.Table": + return table yield ( DummyPreprocessorWithNothing(), DummyPreprocessorWithPandas(), - DummyPreprocessorWithNumpy(), - DummyPreprocessorWithPandasAndNumpy(), + DummyPreprocessorWithArrow(), + DummyPreprocessorWithPandasAndArrow(), ) @@ -1138,6 +1131,36 @@ def test_normalizer(): assert out_df.equals(expected_df) +def test_batch_mapper(): + """Tests batch mapper functionality.""" + old_column = [1, 2, 3, 4] + to_be_modified = [1, -1, 1, -1] + in_df = pd.DataFrame.from_dict( + {"old_column": old_column, "to_be_modified": to_be_modified} + ) + ds = ray.data.from_pandas(in_df) + + def add_and_modify_udf(df: "pd.DataFrame"): + df["new_col"] = df["old_column"] + 1 + df["to_be_modified"] *= 2 + return df + + batch_mapper = BatchMapper(fn=add_and_modify_udf) + batch_mapper.fit(ds) + transformed = batch_mapper.transform(ds) + out_df = transformed.to_pandas() + + expected_df = pd.DataFrame.from_dict( + { + "old_column": old_column, + "to_be_modified": [2, -2, 2, -2], + "new_col": [2, 3, 4, 5], + } + ) + + assert out_df.equals(expected_df) + + def test_power_transformer(): """Tests basic PowerTransformer functionality.""" @@ -1557,13 +1580,13 @@ def test_simple_hash(): assert simple_hash([1, 2, "apple"], 100) == 37 -def test_numpy_pandas_support_simple_dataset(create_dummy_preprocessors): +def test_arrow_pandas_support_simple_dataset(create_dummy_preprocessors): # Case 1: simple dataset. No support ( with_nothing, with_pandas, - with_numpy, - with_pandas_and_numpy, + with_arrow, + with_pandas_and_arrow, ) = create_dummy_preprocessors ds = ray.data.range(10) @@ -1574,19 +1597,19 @@ def test_numpy_pandas_support_simple_dataset(create_dummy_preprocessors): with_pandas.transform(ds) with pytest.raises(ValueError): - with_numpy.transform(ds) + with_arrow.transform(ds) with pytest.raises(ValueError): - with_pandas_and_numpy.transform(ds) + with_pandas_and_arrow.transform(ds) -def test_numpy_pandas_support_pandas_dataset(create_dummy_preprocessors): +def test_arrow_pandas_support_pandas_dataset(create_dummy_preprocessors): # Case 2: pandas dataset ( with_nothing, with_pandas, - _, - with_pandas_and_numpy, + with_arrow, + with_pandas_and_arrow, ) = create_dummy_preprocessors df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=["A", "B", "C"]) @@ -1596,16 +1619,18 @@ def test_numpy_pandas_support_pandas_dataset(create_dummy_preprocessors): assert with_pandas.transform(ds)._dataset_format() == "pandas" - assert with_pandas_and_numpy.transform(ds)._dataset_format() == "pandas" + assert with_arrow.transform(ds)._dataset_format() == "arrow" + + assert with_pandas_and_arrow.transform(ds)._dataset_format() == "pandas" -def test_numpy_pandas_support_arrow_dataset(create_dummy_preprocessors): +def test_arrow_pandas_support_arrow_dataset(create_dummy_preprocessors): # Case 3: arrow dataset ( with_nothing, with_pandas, - with_numpy, - with_pandas_and_numpy, + with_arrow, + with_pandas_and_arrow, ) = create_dummy_preprocessors df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=["A", "B", "C"]) @@ -1615,19 +1640,18 @@ def test_numpy_pandas_support_arrow_dataset(create_dummy_preprocessors): assert with_pandas.transform(ds)._dataset_format() == "pandas" - assert with_numpy.transform(ds)._dataset_format() == "arrow" + assert with_arrow.transform(ds)._dataset_format() == "arrow" - # Auto select data_format = "arrow" -> batch_format = "numpy" for performance - assert with_pandas_and_numpy.transform(ds)._dataset_format() == "arrow" + assert with_pandas_and_arrow.transform(ds)._dataset_format() == "arrow" -def test_numpy_pandas_support_transform_batch_wrong_format(create_dummy_preprocessors): +def test_arrow_pandas_support_transform_batch_wrong_format(create_dummy_preprocessors): # Case 1: simple dataset. No support ( with_nothing, with_pandas, - with_numpy, - with_pandas_and_numpy, + with_arrow, + with_pandas_and_arrow, ) = create_dummy_preprocessors batch = [1, 2, 3] @@ -1638,105 +1662,51 @@ def test_numpy_pandas_support_transform_batch_wrong_format(create_dummy_preproce with_pandas.transform_batch(batch) with pytest.raises(NotImplementedError): - with_numpy.transform_batch(batch) + with_arrow.transform_batch(batch) with pytest.raises(NotImplementedError): - with_pandas_and_numpy.transform_batch(batch) + with_pandas_and_arrow.transform_batch(batch) -def test_numpy_pandas_support_transform_batch_pandas(create_dummy_preprocessors): +def test_arrow_pandas_support_transform_batch_pandas(create_dummy_preprocessors): # Case 2: pandas dataset ( with_nothing, with_pandas, - with_numpy, - with_pandas_and_numpy, + with_arrow, + with_pandas_and_arrow, ) = create_dummy_preprocessors df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=["A", "B", "C"]) - df_single_column = pd.DataFrame([1, 2, 3], columns=["A"]) with pytest.raises(NotImplementedError): with_nothing.transform_batch(df) - with pytest.raises(NotImplementedError): - with_nothing.transform_batch(df_single_column) assert isinstance(with_pandas.transform_batch(df), pd.DataFrame) - assert isinstance(with_pandas.transform_batch(df_single_column), pd.DataFrame) - assert isinstance(with_numpy.transform_batch(df), (np.ndarray, dict)) - # We can get pd.DataFrame after returning numpy data from UDF - assert isinstance(with_numpy.transform_batch(df_single_column), (np.ndarray, dict)) + assert isinstance(with_arrow.transform_batch(df), pyarrow.Table) - assert isinstance(with_pandas_and_numpy.transform_batch(df), pd.DataFrame) - assert isinstance( - with_pandas_and_numpy.transform_batch(df_single_column), pd.DataFrame - ) + assert isinstance(with_pandas_and_arrow.transform_batch(df), pd.DataFrame) -def test_numpy_pandas_support_transform_batch_arrow(create_dummy_preprocessors): +def test_arrow_pandas_support_transform_batch_arrow(create_dummy_preprocessors): # Case 3: arrow dataset ( with_nothing, with_pandas, - with_numpy, - with_pandas_and_numpy, + with_arrow, + with_pandas_and_arrow, ) = create_dummy_preprocessors df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=["A", "B", "C"]) - df_single_column = pd.DataFrame([1, 2, 3], columns=["A"]) - table = pyarrow.Table.from_pandas(df) - table_single_column = pyarrow.Table.from_pandas(df_single_column) with pytest.raises(NotImplementedError): with_nothing.transform_batch(table) - with pytest.raises(NotImplementedError): - with_nothing.transform_batch(table_single_column) assert isinstance(with_pandas.transform_batch(table), pd.DataFrame) - assert isinstance(with_pandas.transform_batch(table_single_column), pd.DataFrame) - - assert isinstance(with_numpy.transform_batch(table), (np.ndarray, dict)) - # We can get pyarrow.Table after returning numpy data from UDF - assert isinstance( - with_numpy.transform_batch(table_single_column), (np.ndarray, dict) - ) - # Auto select data_format = "arrow" -> batch_format = "numpy" for performance - assert isinstance(with_pandas_and_numpy.transform_batch(table), (np.ndarray, dict)) - # We can get pyarrow.Table after returning numpy data from UDF - assert isinstance( - with_pandas_and_numpy.transform_batch(table_single_column), (np.ndarray, dict) - ) - - -def test_numpy_pandas_support_transform_batch_tensor(create_dummy_preprocessors): - # Case 4: tensor dataset created by from numpy data directly - ( - with_nothing, - _, - with_numpy, - with_pandas_and_numpy, - ) = create_dummy_preprocessors - np_data = np.arange(12).reshape(3, 2, 2) - np_single_column = {"A": np.arange(12).reshape(3, 2, 2)} - np_multi_column = { - "A": np.arange(12).reshape(3, 2, 2), - "B": np.arange(12, 24).reshape(3, 2, 2), - } - - with pytest.raises(NotImplementedError): - with_nothing.transform_batch(np_data) - with pytest.raises(NotImplementedError): - with_nothing.transform_batch(np_single_column) - with pytest.raises(NotImplementedError): - with_nothing.transform_batch(np_multi_column) - assert isinstance(with_numpy.transform_batch(np_data), np.ndarray) - assert isinstance(with_numpy.transform_batch(np_single_column), dict) - assert isinstance(with_numpy.transform_batch(np_multi_column), dict) + assert isinstance(with_arrow.transform_batch(table), pyarrow.Table) - assert isinstance(with_pandas_and_numpy.transform_batch(np_data), np.ndarray) - assert isinstance(with_pandas_and_numpy.transform_batch(np_single_column), dict) - assert isinstance(with_pandas_and_numpy.transform_batch(np_multi_column), dict) + assert isinstance(with_pandas_and_arrow.transform_batch(table), pyarrow.Table) if __name__ == "__main__":