Skip to content

Commit

Permalink
[Datasets] Deprecate "native" batch format in favor of "default" (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
bveeramani authored Sep 14, 2022
1 parent 38d87c2 commit 5043b1e
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 42 deletions.
7 changes: 3 additions & 4 deletions doc/source/data/consuming-datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ Datasets can be consumed a row at a time using the
or a batch at a time using the
:meth:`ds.iter_batches() <ray.data.Dataset.iter_batches>` API, where you can specify
batch size as well as the desired batch format. By default, the batch format is
``"native"``, which means that the batch format that's native to the data type will be
returned. For tabular data, the native format is a Pandas DataFrame; for Python objects,
it's a list.
``"default"``. For tabular data, the default format is a Pandas DataFrame; for Python
objects, it's a list.

.. literalinclude:: ./doc_code/accessing_datasets.py
:language: python
Expand Down Expand Up @@ -109,7 +108,7 @@ to repartition the Dataset before writing out.
:start-after: __write_json_begin__
:end-before: __write_json_end__

.. tabbed:: NumPy
.. tabbed:: NumPy

.. literalinclude:: ./doc_code/saving_datasets.py
:language: python
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/dataset-tensor-support.rst
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ Transforming / Consuming Tensor Data

Like any other Dataset, Datasets with tensor columns can be consumed / transformed in batches via the :meth:`ds.iter_batches(batch_format=\<format\>) <ray.data.Dataset.iter_batches>` and :meth:`ds.map_batches(fn, batch_format=\<format\>) <ray.data.Dataset.map_batches>` APIs. This section shows the available batch formats and their behavior:

.. tabbed:: "native" (default)
.. tabbed:: "default"

**Single-column**:

Expand Down
4 changes: 2 additions & 2 deletions doc/source/data/transforming-datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ Choose the *batch format* of the data given to UDFs
by setting the ``batch_format`` option of :meth:`.map_batches() <ray.data.Dataset.map_batches>`.
Here is an overview of the available batch formats:

.. tabbed:: "native" (default)
.. tabbed:: "default"

The "native" batch format presents data as follows for each Dataset type:
The "default" batch format presents data as follows for each Dataset type:

* **Tabular Datasets**: Each batch will be a
`pandas.DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__.
Expand Down
10 changes: 5 additions & 5 deletions python/ray/data/_internal/block_batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def batch_blocks(
prefetch_blocks: int = 0,
clear_block_after_read: bool = False,
batch_size: Optional[int] = None,
batch_format: str = "native",
batch_format: str = "default",
drop_last: bool = False,
shuffle_buffer_min_size: Optional[int] = None,
shuffle_seed: Optional[int] = None,
Expand All @@ -58,10 +58,10 @@ def batch_blocks(
the block will never be accessed again.
batch_size: Record batch size, or None to let the system pick.
batch_format: The format in which to return each batch.
Specify "native" to use the current block format (promoting
Specify "default" to use the current block format (promoting
Arrow to pandas automatically), "pandas" to
select ``pandas.DataFrame`` or "pyarrow" to select
``pyarrow.Table``. Default is "native".
``pyarrow.Table``. Default is "default".
drop_last: Whether to drop the last batch if it's incomplete.
shuffle_buffer_min_size: If non-None, the data will be randomly shuffled using a
local in-memory shuffle buffer, and this value will serve as the minimum
Expand Down Expand Up @@ -137,8 +137,8 @@ def get_batches(block: Optional[ObjectRef[Block]] = None) -> Iterator[BatchType]


def _format_batch(batch: Block, batch_format: str) -> BatchType:
if batch_format == "native":
batch = BlockAccessor.for_block(batch).to_native()
if batch_format == "default" or batch_format == "native":
batch = BlockAccessor.for_block(batch).to_default()
elif batch_format == "pandas":
batch = BlockAccessor.for_block(batch).to_pandas()
elif batch_format == "pyarrow":
Expand Down
8 changes: 4 additions & 4 deletions python/ray/data/_internal/table_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ def _get_row(self, index: int, copy: bool = False) -> Union[TableRow, np.ndarray
def _build_tensor_row(row: TableRow) -> np.ndarray:
raise NotImplementedError

def to_native(self) -> Block:
def to_default(self) -> Block:
if self.is_tensor_wrapper():
native = self.to_numpy()
default = self.to_numpy()
else:
# Always promote Arrow blocks to pandas for consistency, since
# we lazily convert pandas->Arrow internally for efficiency.
native = self.to_pandas()
return native
default = self.to_pandas()
return default

def column_names(self) -> List[str]:
raise NotImplementedError
Expand Down
10 changes: 5 additions & 5 deletions python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def __call__(self, __arg: T) -> U:
# by default. When block splitting is off, the type is a plain block.
MaybeBlockPartition = Union[Block, BlockPartition]

VALID_BATCH_FORMATS = ["native", "pandas", "pyarrow", "numpy"]
VALID_BATCH_FORMATS = ["default", "native", "pandas", "pyarrow", "numpy"]


@DeveloperAPI
Expand Down Expand Up @@ -302,8 +302,8 @@ def to_block(self) -> Block:
"""Return the base block that this accessor wraps."""
raise NotImplementedError

def to_native(self) -> Block:
"""Return the native data format for this accessor."""
def to_default(self) -> Block:
"""Return the default data format for this accessor."""
return self.to_block()

def to_batch_format(self, batch_format: str) -> DataBatch:
Expand All @@ -315,8 +315,8 @@ def to_batch_format(self, batch_format: str) -> DataBatch:
Returns:
This block formatted as the provided batch format.
"""
if batch_format == "native":
return self.to_native()
if batch_format == "default" or batch_format == "native":
return self.to_default()
elif batch_format == "pandas":
return self.to_pandas()
elif batch_format == "pyarrow":
Expand Down
31 changes: 22 additions & 9 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
Union,
)
from uuid import uuid4
import warnings

import numpy as np

Expand Down Expand Up @@ -310,7 +311,7 @@ def map_batches(
*,
batch_size: Optional[int] = 4096,
compute: Union[str, ComputeStrategy] = None,
batch_format: str = "native",
batch_format: str = "default",
fn_args: Optional[Iterable[Any]] = None,
fn_kwargs: Optional[Dict[str, Any]] = None,
fn_constructor_args: Optional[Iterable[Any]] = None,
Expand Down Expand Up @@ -376,11 +377,11 @@ def map_batches(
:class:`ActorPoolStrategy(min, max) <ray.data.ActorPoolStrategy>`
instance. If using callable classes for fn, the actor compute strategy
must be used.
batch_format: Specify "native" to use the native block format (promotes
batch_format: Specify "default" to use the default block format (promotes
tables to Pandas and tensors to NumPy), "pandas" to select
``pandas.DataFrame``, "pyarrow" to select ``pyarrow.Table``, or "numpy"
to select ``numpy.ndarray`` for tensor datasets and
``Dict[str, numpy.ndarray]`` for tabular datasets. Default is "native".
``Dict[str, numpy.ndarray]`` for tabular datasets. Default is "default".
fn_args: Positional arguments to pass to ``fn``, after the data batch. These
arguments will be top-level arguments in the underlying Ray task that's
submitted.
Expand All @@ -402,6 +403,12 @@ def map_batches(
import pandas as pd
import pyarrow as pa

if batch_format == "native":
warnings.warn(
"The 'native' batch format has been renamed 'default'.",
DeprecationWarning,
)

if batch_size is not None and batch_size < 1:
raise ValueError("Batch size cannot be negative or 0")

Expand Down Expand Up @@ -2314,15 +2321,15 @@ def iter_rows(self, *, prefetch_blocks: int = 0) -> Iterator[Union[T, TableRow]]
try:
dataset_format = self._dataset_format()
except ValueError:
# Dataset is empty or cleared, so fall back to "native".
batch_format = "native"
# Dataset is empty or cleared, so fall back to "default".
batch_format = "default"
else:
batch_format = (
"pyarrow"
if dataset_format == "arrow"
else "pandas"
if dataset_format == "pandas"
else "native"
else "default"
)
for batch in self.iter_batches(
batch_size=None, prefetch_blocks=prefetch_blocks, batch_format=batch_format
Expand All @@ -2336,7 +2343,7 @@ def iter_batches(
*,
prefetch_blocks: int = 0,
batch_size: Optional[int] = 256,
batch_format: str = "native",
batch_format: str = "default",
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
local_shuffle_seed: Optional[int] = None,
Expand All @@ -2358,11 +2365,11 @@ def iter_batches(
The final batch may include fewer than ``batch_size`` rows if
``drop_last`` is ``False``. Defaults to 256.
batch_format: The format in which to return each batch.
Specify "native" to use the native block format (promoting
Specify "default" to use the default block format (promoting
tables to Pandas and tensors to NumPy), "pandas" to select
``pandas.DataFrame``, "pyarrow" to select ``pyarrow.Table``, or "numpy"
to select ``numpy.ndarray`` for tensor datasets and
``Dict[str, numpy.ndarray]`` for tabular datasets. Default is "native".
``Dict[str, numpy.ndarray]`` for tabular datasets. Default is "default".
drop_last: Whether to drop the last batch if it's incomplete.
local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
using a local in-memory shuffle buffer, and this value will serve as the
Expand All @@ -2374,6 +2381,12 @@ def iter_batches(
Returns:
An iterator over record batches.
"""
if batch_format == "native":
warnings.warn(
"The 'native' batch format has been renamed 'default'.",
DeprecationWarning,
)

blocks = self._plan.execute()
stats = self._plan.stats()

Expand Down
19 changes: 13 additions & 6 deletions python/ray/data/dataset_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Tuple,
Union,
)
import warnings

import ray
from ray.data._internal import progress_bar
Expand Down Expand Up @@ -137,7 +138,7 @@ def iter_batches(
*,
prefetch_blocks: int = 0,
batch_size: Optional[int] = 256,
batch_format: str = "native",
batch_format: str = "default",
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
local_shuffle_seed: Optional[int] = None,
Expand All @@ -160,10 +161,10 @@ def iter_batches(
The final batch may include fewer than ``batch_size`` rows if
``drop_last`` is ``False``. Defaults to 256.
batch_format: The format in which to return each batch.
Specify "native" to use the current block format (promoting
Specify "default" to use the current block format (promoting
Arrow to pandas automatically), "pandas" to
select ``pandas.DataFrame`` or "pyarrow" to select
``pyarrow.Table``. Default is "native".
``pyarrow.Table``. Default is "default".
drop_last: Whether to drop the last batch if it's incomplete.
local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
using a local in-memory shuffle buffer, and this value will serve as the
Expand All @@ -178,6 +179,12 @@ def iter_batches(
Returns:
An iterator over record batches.
"""
if batch_format == "native":
warnings.warn(
"The 'native' batch format has been renamed 'default'.",
DeprecationWarning,
)

if self._executed[0]:
raise RuntimeError("Pipeline cannot be read multiple times.")
time_start = time.perf_counter()
Expand Down Expand Up @@ -721,7 +728,7 @@ def map_batches(
*,
batch_size: Optional[int] = 4096,
compute: Union[str, ComputeStrategy] = None,
batch_format: str = "native",
batch_format: str = "default",
fn_args: Optional[Iterable[Any]] = None,
fn_kwargs: Optional[Dict[str, Any]] = None,
fn_constructor_args: Optional[Iterable[Any]] = None,
Expand Down Expand Up @@ -951,7 +958,7 @@ def iter_tf_batches(
*,
prefetch_blocks: int = 0,
batch_size: Optional[int] = 256,
batch_format: str = "native",
batch_format: str = "default",
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
local_shuffle_seed: Optional[int] = None,
Expand All @@ -973,7 +980,7 @@ def iter_torch_batches(
*,
prefetch_blocks: int = 0,
batch_size: Optional[int] = 256,
batch_format: str = "native",
batch_format: str = "default",
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
local_shuffle_seed: Optional[int] = None,
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/grouped_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def map_groups(
fn: Union[CallableClass, Callable[[BatchType], BatchType]],
*,
compute: Union[str, ComputeStrategy] = None,
batch_format: str = "native",
batch_format: str = "default",
**ray_remote_args,
) -> "Dataset[Any]":
# TODO AttributeError: 'GroupedDataset' object has no attribute 'map_groups'
Expand Down Expand Up @@ -230,7 +230,7 @@ def map_groups(
batch of zero or more records, similar to map_batches().
compute: The compute strategy, either "tasks" (default) to use Ray
tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool.
batch_format: Specify "native" to use the native block format
batch_format: Specify "default" to use the default block format
(promotes Arrow to pandas), "pandas" to select
``pandas.DataFrame`` as the batch format,
or "pyarrow" to select ``pyarrow.Table``.
Expand Down
6 changes: 5 additions & 1 deletion python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1735,10 +1735,14 @@ def test_iter_batches_basic(ray_start_regular_shared):
assert all(isinstance(col, np.ndarray) for col in batch.values())
pd.testing.assert_frame_equal(pd.DataFrame(batch), df)

# Native format.
# Native format (deprecated).
for batch, df in zip(ds.iter_batches(batch_size=None, batch_format="native"), dfs):
assert BlockAccessor.for_block(batch).to_pandas().equals(df)

# Default format.
for batch, df in zip(ds.iter_batches(batch_size=None, batch_format="default"), dfs):
assert BlockAccessor.for_block(batch).to_pandas().equals(df)

# Batch size.
batch_size = 2
batches = list(ds.iter_batches(batch_size=batch_size, batch_format="pandas"))
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ def equalize_helper(input_block_lists):
for block_ref, _ in blocklist.get_blocks_with_metadata():
block = ray.get(block_ref)
block_accessor = BlockAccessor.for_block(block)
block_list.append(block_accessor.to_native())
block_list.append(block_accessor.to_default())
result_block_lists.append(block_list)
return result_block_lists

Expand Down
4 changes: 2 additions & 2 deletions python/ray/train/tests/test_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ def get_dataset():
for _ in range(num_epochs):
dataset_this_epoch = next(pipeline_iterator)
data_this_epoch = []
for batch in dataset_this_epoch.iter_batches(batch_format="native"):
for batch in dataset_this_epoch.iter_batches(batch_format="default"):
data_this_epoch.extend(batch)
data_all_epochs.append(data_this_epoch)
return data_all_epochs
Expand All @@ -1144,7 +1144,7 @@ def get_dataset():
for _ in range(2):
dataset_this_epoch = next(pipeline_iterator)
data_this_epoch = []
for batch in dataset_this_epoch.iter_batches(batch_format="native"):
for batch in dataset_this_epoch.iter_batches(batch_format="default"):
data_this_epoch.extend(batch)

if len(data_all_epochs) > 0:
Expand Down

0 comments on commit 5043b1e

Please sign in to comment.