diff --git a/doc/source/data/consuming-datasets.rst b/doc/source/data/consuming-datasets.rst index 401fda1a5ff1..d10b690e58f2 100644 --- a/doc/source/data/consuming-datasets.rst +++ b/doc/source/data/consuming-datasets.rst @@ -41,9 +41,8 @@ Datasets can be consumed a row at a time using the or a batch at a time using the :meth:`ds.iter_batches() ` API, where you can specify batch size as well as the desired batch format. By default, the batch format is -``"native"``, which means that the batch format that's native to the data type will be -returned. For tabular data, the native format is a Pandas DataFrame; for Python objects, -it's a list. +``"default"``. For tabular data, the default format is a Pandas DataFrame; for Python +objects, it's a list. .. literalinclude:: ./doc_code/accessing_datasets.py :language: python @@ -109,7 +108,7 @@ to repartition the Dataset before writing out. :start-after: __write_json_begin__ :end-before: __write_json_end__ -.. tabbed:: NumPy +.. tabbed:: NumPy .. literalinclude:: ./doc_code/saving_datasets.py :language: python diff --git a/doc/source/data/dataset-tensor-support.rst b/doc/source/data/dataset-tensor-support.rst index 2a6e8db80eee..3f08bc6aef28 100644 --- a/doc/source/data/dataset-tensor-support.rst +++ b/doc/source/data/dataset-tensor-support.rst @@ -113,7 +113,7 @@ Transforming / Consuming Tensor Data Like any other Dataset, Datasets with tensor columns can be consumed / transformed in batches via the :meth:`ds.iter_batches(batch_format=\) ` and :meth:`ds.map_batches(fn, batch_format=\) ` APIs. This section shows the available batch formats and their behavior: -.. tabbed:: "native" (default) +.. tabbed:: "default" **Single-column**: diff --git a/doc/source/data/transforming-datasets.rst b/doc/source/data/transforming-datasets.rst index 03ed1eefa168..a90d3c79252a 100644 --- a/doc/source/data/transforming-datasets.rst +++ b/doc/source/data/transforming-datasets.rst @@ -121,9 +121,9 @@ Choose the *batch format* of the data given to UDFs by setting the ``batch_format`` option of :meth:`.map_batches() `. Here is an overview of the available batch formats: -.. tabbed:: "native" (default) +.. tabbed:: "default" - The "native" batch format presents data as follows for each Dataset type: + The "default" batch format presents data as follows for each Dataset type: * **Tabular Datasets**: Each batch will be a `pandas.DataFrame `__. diff --git a/python/ray/data/_internal/block_batching.py b/python/ray/data/_internal/block_batching.py index 806fe0621355..87a1cae730dd 100644 --- a/python/ray/data/_internal/block_batching.py +++ b/python/ray/data/_internal/block_batching.py @@ -36,7 +36,7 @@ def batch_blocks( prefetch_blocks: int = 0, clear_block_after_read: bool = False, batch_size: Optional[int] = None, - batch_format: str = "native", + batch_format: str = "default", drop_last: bool = False, shuffle_buffer_min_size: Optional[int] = None, shuffle_seed: Optional[int] = None, @@ -58,10 +58,10 @@ def batch_blocks( the block will never be accessed again. batch_size: Record batch size, or None to let the system pick. batch_format: The format in which to return each batch. - Specify "native" to use the current block format (promoting + Specify "default" to use the current block format (promoting Arrow to pandas automatically), "pandas" to select ``pandas.DataFrame`` or "pyarrow" to select - ``pyarrow.Table``. Default is "native". + ``pyarrow.Table``. Default is "default". drop_last: Whether to drop the last batch if it's incomplete. shuffle_buffer_min_size: If non-None, the data will be randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the minimum @@ -137,8 +137,8 @@ def get_batches(block: Optional[ObjectRef[Block]] = None) -> Iterator[BatchType] def _format_batch(batch: Block, batch_format: str) -> BatchType: - if batch_format == "native": - batch = BlockAccessor.for_block(batch).to_native() + if batch_format == "default" or batch_format == "native": + batch = BlockAccessor.for_block(batch).to_default() elif batch_format == "pandas": batch = BlockAccessor.for_block(batch).to_pandas() elif batch_format == "pyarrow": diff --git a/python/ray/data/_internal/table_block.py b/python/ray/data/_internal/table_block.py index 9a372d697ab0..8e5e701b3e00 100644 --- a/python/ray/data/_internal/table_block.py +++ b/python/ray/data/_internal/table_block.py @@ -123,14 +123,14 @@ def _get_row(self, index: int, copy: bool = False) -> Union[TableRow, np.ndarray def _build_tensor_row(row: TableRow) -> np.ndarray: raise NotImplementedError - def to_native(self) -> Block: + def to_default(self) -> Block: if self.is_tensor_wrapper(): - native = self.to_numpy() + default = self.to_numpy() else: # Always promote Arrow blocks to pandas for consistency, since # we lazily convert pandas->Arrow internally for efficiency. - native = self.to_pandas() - return native + default = self.to_pandas() + return default def column_names(self) -> List[str]: raise NotImplementedError diff --git a/python/ray/data/block.py b/python/ray/data/block.py index 4b6bc582ea41..c1b061e9cc14 100644 --- a/python/ray/data/block.py +++ b/python/ray/data/block.py @@ -142,7 +142,7 @@ def __call__(self, __arg: T) -> U: # by default. When block splitting is off, the type is a plain block. MaybeBlockPartition = Union[Block, BlockPartition] -VALID_BATCH_FORMATS = ["native", "pandas", "pyarrow", "numpy"] +VALID_BATCH_FORMATS = ["default", "native", "pandas", "pyarrow", "numpy"] @DeveloperAPI @@ -302,8 +302,8 @@ def to_block(self) -> Block: """Return the base block that this accessor wraps.""" raise NotImplementedError - def to_native(self) -> Block: - """Return the native data format for this accessor.""" + def to_default(self) -> Block: + """Return the default data format for this accessor.""" return self.to_block() def to_batch_format(self, batch_format: str) -> DataBatch: @@ -315,8 +315,8 @@ def to_batch_format(self, batch_format: str) -> DataBatch: Returns: This block formatted as the provided batch format. """ - if batch_format == "native": - return self.to_native() + if batch_format == "default" or batch_format == "native": + return self.to_default() elif batch_format == "pandas": return self.to_pandas() elif batch_format == "pyarrow": diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 69a058ba3633..2e4185c4f6b2 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -18,6 +18,7 @@ Union, ) from uuid import uuid4 +import warnings import numpy as np @@ -310,7 +311,7 @@ def map_batches( *, batch_size: Optional[int] = 4096, compute: Union[str, ComputeStrategy] = None, - batch_format: str = "native", + batch_format: str = "default", fn_args: Optional[Iterable[Any]] = None, fn_kwargs: Optional[Dict[str, Any]] = None, fn_constructor_args: Optional[Iterable[Any]] = None, @@ -376,11 +377,11 @@ def map_batches( :class:`ActorPoolStrategy(min, max) ` instance. If using callable classes for fn, the actor compute strategy must be used. - batch_format: Specify "native" to use the native block format (promotes + batch_format: Specify "default" to use the default block format (promotes tables to Pandas and tensors to NumPy), "pandas" to select ``pandas.DataFrame``, "pyarrow" to select ``pyarrow.Table``, or "numpy" to select ``numpy.ndarray`` for tensor datasets and - ``Dict[str, numpy.ndarray]`` for tabular datasets. Default is "native". + ``Dict[str, numpy.ndarray]`` for tabular datasets. Default is "default". fn_args: Positional arguments to pass to ``fn``, after the data batch. These arguments will be top-level arguments in the underlying Ray task that's submitted. @@ -402,6 +403,12 @@ def map_batches( import pandas as pd import pyarrow as pa + if batch_format == "native": + warnings.warn( + "The 'native' batch format has been renamed 'default'.", + DeprecationWarning, + ) + if batch_size is not None and batch_size < 1: raise ValueError("Batch size cannot be negative or 0") @@ -2314,15 +2321,15 @@ def iter_rows(self, *, prefetch_blocks: int = 0) -> Iterator[Union[T, TableRow]] try: dataset_format = self._dataset_format() except ValueError: - # Dataset is empty or cleared, so fall back to "native". - batch_format = "native" + # Dataset is empty or cleared, so fall back to "default". + batch_format = "default" else: batch_format = ( "pyarrow" if dataset_format == "arrow" else "pandas" if dataset_format == "pandas" - else "native" + else "default" ) for batch in self.iter_batches( batch_size=None, prefetch_blocks=prefetch_blocks, batch_format=batch_format @@ -2336,7 +2343,7 @@ def iter_batches( *, prefetch_blocks: int = 0, batch_size: Optional[int] = 256, - batch_format: str = "native", + batch_format: str = "default", drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, @@ -2358,11 +2365,11 @@ def iter_batches( The final batch may include fewer than ``batch_size`` rows if ``drop_last`` is ``False``. Defaults to 256. batch_format: The format in which to return each batch. - Specify "native" to use the native block format (promoting + Specify "default" to use the default block format (promoting tables to Pandas and tensors to NumPy), "pandas" to select ``pandas.DataFrame``, "pyarrow" to select ``pyarrow.Table``, or "numpy" to select ``numpy.ndarray`` for tensor datasets and - ``Dict[str, numpy.ndarray]`` for tabular datasets. Default is "native". + ``Dict[str, numpy.ndarray]`` for tabular datasets. Default is "default". drop_last: Whether to drop the last batch if it's incomplete. local_shuffle_buffer_size: If non-None, the data will be randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the @@ -2374,6 +2381,12 @@ def iter_batches( Returns: An iterator over record batches. """ + if batch_format == "native": + warnings.warn( + "The 'native' batch format has been renamed 'default'.", + DeprecationWarning, + ) + blocks = self._plan.execute() stats = self._plan.stats() diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index a4d444b6212f..3f980bb606d7 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -14,6 +14,7 @@ Tuple, Union, ) +import warnings import ray from ray.data._internal import progress_bar @@ -137,7 +138,7 @@ def iter_batches( *, prefetch_blocks: int = 0, batch_size: Optional[int] = 256, - batch_format: str = "native", + batch_format: str = "default", drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, @@ -160,10 +161,10 @@ def iter_batches( The final batch may include fewer than ``batch_size`` rows if ``drop_last`` is ``False``. Defaults to 256. batch_format: The format in which to return each batch. - Specify "native" to use the current block format (promoting + Specify "default" to use the current block format (promoting Arrow to pandas automatically), "pandas" to select ``pandas.DataFrame`` or "pyarrow" to select - ``pyarrow.Table``. Default is "native". + ``pyarrow.Table``. Default is "default". drop_last: Whether to drop the last batch if it's incomplete. local_shuffle_buffer_size: If non-None, the data will be randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the @@ -178,6 +179,12 @@ def iter_batches( Returns: An iterator over record batches. """ + if batch_format == "native": + warnings.warn( + "The 'native' batch format has been renamed 'default'.", + DeprecationWarning, + ) + if self._executed[0]: raise RuntimeError("Pipeline cannot be read multiple times.") time_start = time.perf_counter() @@ -721,7 +728,7 @@ def map_batches( *, batch_size: Optional[int] = 4096, compute: Union[str, ComputeStrategy] = None, - batch_format: str = "native", + batch_format: str = "default", fn_args: Optional[Iterable[Any]] = None, fn_kwargs: Optional[Dict[str, Any]] = None, fn_constructor_args: Optional[Iterable[Any]] = None, @@ -951,7 +958,7 @@ def iter_tf_batches( *, prefetch_blocks: int = 0, batch_size: Optional[int] = 256, - batch_format: str = "native", + batch_format: str = "default", drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, @@ -973,7 +980,7 @@ def iter_torch_batches( *, prefetch_blocks: int = 0, batch_size: Optional[int] = 256, - batch_format: str = "native", + batch_format: str = "default", drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, diff --git a/python/ray/data/grouped_dataset.py b/python/ray/data/grouped_dataset.py index 7a8065f882b2..68e221387657 100644 --- a/python/ray/data/grouped_dataset.py +++ b/python/ray/data/grouped_dataset.py @@ -185,7 +185,7 @@ def map_groups( fn: Union[CallableClass, Callable[[BatchType], BatchType]], *, compute: Union[str, ComputeStrategy] = None, - batch_format: str = "native", + batch_format: str = "default", **ray_remote_args, ) -> "Dataset[Any]": # TODO AttributeError: 'GroupedDataset' object has no attribute 'map_groups' @@ -230,7 +230,7 @@ def map_groups( batch of zero or more records, similar to map_batches(). compute: The compute strategy, either "tasks" (default) to use Ray tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool. - batch_format: Specify "native" to use the native block format + batch_format: Specify "default" to use the default block format (promotes Arrow to pandas), "pandas" to select ``pandas.DataFrame`` as the batch format, or "pyarrow" to select ``pyarrow.Table``. diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 33c3078d1180..8bbbfd407b3c 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -1735,10 +1735,14 @@ def test_iter_batches_basic(ray_start_regular_shared): assert all(isinstance(col, np.ndarray) for col in batch.values()) pd.testing.assert_frame_equal(pd.DataFrame(batch), df) - # Native format. + # Native format (deprecated). for batch, df in zip(ds.iter_batches(batch_size=None, batch_format="native"), dfs): assert BlockAccessor.for_block(batch).to_pandas().equals(df) + # Default format. + for batch, df in zip(ds.iter_batches(batch_size=None, batch_format="default"), dfs): + assert BlockAccessor.for_block(batch).to_pandas().equals(df) + # Batch size. batch_size = 2 batches = list(ds.iter_batches(batch_size=batch_size, batch_format="pandas")) diff --git a/python/ray/data/tests/test_split.py b/python/ray/data/tests/test_split.py index 31cef96e7439..cc78fb2b3546 100644 --- a/python/ray/data/tests/test_split.py +++ b/python/ray/data/tests/test_split.py @@ -662,7 +662,7 @@ def equalize_helper(input_block_lists): for block_ref, _ in blocklist.get_blocks_with_metadata(): block = ray.get(block_ref) block_accessor = BlockAccessor.for_block(block) - block_list.append(block_accessor.to_native()) + block_list.append(block_accessor.to_default()) result_block_lists.append(block_list) return result_block_lists diff --git a/python/ray/train/tests/test_trainer.py b/python/ray/train/tests/test_trainer.py index 8b4b6a52d7a7..53168ea09273 100644 --- a/python/ray/train/tests/test_trainer.py +++ b/python/ray/train/tests/test_trainer.py @@ -1119,7 +1119,7 @@ def get_dataset(): for _ in range(num_epochs): dataset_this_epoch = next(pipeline_iterator) data_this_epoch = [] - for batch in dataset_this_epoch.iter_batches(batch_format="native"): + for batch in dataset_this_epoch.iter_batches(batch_format="default"): data_this_epoch.extend(batch) data_all_epochs.append(data_this_epoch) return data_all_epochs @@ -1144,7 +1144,7 @@ def get_dataset(): for _ in range(2): dataset_this_epoch = next(pipeline_iterator) data_this_epoch = [] - for batch in dataset_this_epoch.iter_batches(batch_format="native"): + for batch in dataset_this_epoch.iter_batches(batch_format="default"): data_this_epoch.extend(batch) if len(data_all_epochs) > 0: