Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Add logical operator for map() #31912

Merged
merged 3 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 70 additions & 16 deletions python/ray/data/_internal/logical/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
from ray.data._internal.execution.operators.map_operator import MapOperator
from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data._internal.compute import (
UDF,
get_compute,
CallableClass,
ComputeStrategy,
TaskPoolStrategy,
ActorPoolStrategy,
)
from ray.data.block import BatchUDF, Block
from ray.data.block import BatchUDF, Block, RowUDF


if sys.version_info >= (3, 8):
Expand All @@ -22,44 +23,97 @@
from typing_extensions import Literal


class MapBatches(LogicalOperator):
"""Logical operator for map_batches."""
class AbstractMap(LogicalOperator):
"""Abstract class for logical operators should be converted to physical
MapOperator.
"""

def __init__(
self,
name: str,
input_op: LogicalOperator,
block_fn: BlockTransform,
fn: BatchUDF,
batch_size: Optional[Union[int, Literal["default"]]] = "default",
compute: Optional[Union[str, ComputeStrategy]] = None,
batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default",
zero_copy_batch: bool = False,
target_block_size: Optional[int] = None,
fn: Optional[UDF] = None,
fn_args: Optional[Iterable[Any]] = None,
fn_kwargs: Optional[Dict[str, Any]] = None,
fn_constructor_args: Optional[Iterable[Any]] = None,
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
super().__init__("MapBatches", [input_op])
super().__init__(name, [input_op])
self._block_fn = block_fn
self._fn = fn
self._batch_size = batch_size
self._compute = compute or "tasks"
self._batch_format = batch_format
self._zero_copy_batch = zero_copy_batch
self._target_block_size = target_block_size
self._fn = fn
self._fn_args = fn_args
self._fn_kwargs = fn_kwargs
self._fn_constructor_args = fn_constructor_args
self._fn_constructor_kwargs = fn_constructor_kwargs
self._ray_remote_args = ray_remote_args or {}


def plan_map_batches_op(
op: MapBatches, input_physical_dag: PhysicalOperator
) -> PhysicalOperator:
"""Get the corresponding DAG of physical operators for MapBatches."""
class MapBatches(AbstractMap):
"""Logical operator for map_batches."""

def __init__(
self,
input_op: LogicalOperator,
block_fn: BlockTransform,
fn: BatchUDF,
batch_size: Optional[Union[int, Literal["default"]]] = "default",
compute: Optional[Union[str, ComputeStrategy]] = None,
batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default",
zero_copy_batch: bool = False,
target_block_size: Optional[int] = None,
fn_args: Optional[Iterable[Any]] = None,
fn_kwargs: Optional[Dict[str, Any]] = None,
fn_constructor_args: Optional[Iterable[Any]] = None,
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
super().__init__(
"MapBatches",
input_op,
block_fn,
compute=compute,
target_block_size=target_block_size,
fn=fn,
fn_args=fn_args,
fn_kwargs=fn_kwargs,
fn_constructor_args=fn_constructor_args,
fn_constructor_kwargs=fn_constructor_kwargs,
ray_remote_args=ray_remote_args,
)
self._batch_size = batch_size
self._batch_format = batch_format
self._zero_copy_batch = zero_copy_batch


class MapRows(AbstractMap):
"""Logical operator for map."""

def __init__(
self,
input_op: LogicalOperator,
block_fn: BlockTransform,
fn: RowUDF,
compute: Optional[Union[str, ComputeStrategy]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
super().__init__(
"MapRows",
input_op,
block_fn,
compute=compute,
fn=fn,
ray_remote_args=ray_remote_args,
)


def plan_map_op(op: AbstractMap, input_physical_dag: PhysicalOperator) -> MapOperator:
"""Get the corresponding physical operators DAG for AbstractMap operators."""
compute = get_compute(op._compute)
block_fn = op._block_fn

Expand Down
8 changes: 4 additions & 4 deletions python/ray/data/_internal/logical/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data._internal.logical.operators.read_operator import Read, plan_read_op
from ray.data._internal.logical.operators.map_operator import (
MapBatches,
plan_map_batches_op,
AbstractMap,
plan_map_op,
)


Expand All @@ -24,9 +24,9 @@ def plan(self, logical_dag: LogicalOperator) -> PhysicalOperator:
if isinstance(logical_dag, Read):
assert not physical_children
physical_dag = plan_read_op(logical_dag)
elif isinstance(logical_dag, MapBatches):
elif isinstance(logical_dag, AbstractMap):
assert len(physical_children) == 1
physical_dag = plan_map_batches_op(logical_dag, physical_children[0])
physical_dag = plan_map_op(logical_dag, physical_children[0])
else:
raise ValueError(
f"Found unknown logical operator during planning: {logical_dag}"
Expand Down
38 changes: 26 additions & 12 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
from ray.air.constants import TENSOR_COLUMN_NAME
from ray.air.util.data_batch_conversion import BlockFormat
from ray.data._internal.logical.optimizers import LogicalPlan
from ray.data._internal.logical.operators.map_operator import MapBatches
from ray.data._internal.logical.operators.map_operator import (
MapRows,
MapBatches,
)
from ray.data.dataset_iterator import DatasetIterator
from ray.data._internal.block_batching import batch_block_refs, batch_blocks
from ray.data._internal.block_list import BlockList
Expand Down Expand Up @@ -334,7 +337,18 @@ def transform(block: Block, fn: RowUDF[T, U]) -> Iterable[Block]:
fn=fn,
)
)
return Dataset(plan, self._epoch, self._lazy)

logical_plan = self._logical_plan
if logical_plan is not None:
map_op = MapRows(
logical_plan.dag,
transform,
fn,
compute=compute,
ray_remote_args=ray_remote_args,
)
logical_plan = LogicalPlan(map_op)
return Dataset(plan, self._epoch, self._lazy, logical_plan)

def map_batches(
self,
Expand Down Expand Up @@ -676,16 +690,16 @@ def process_next_batch(batch: DataBatch) -> Iterator[Block]:
logical_plan.dag,
transform,
fn,
batch_size,
compute,
batch_format,
zero_copy_batch,
target_block_size,
fn_args,
fn_kwargs,
fn_constructor_args,
fn_constructor_kwargs,
ray_remote_args,
batch_size=batch_size,
compute=compute,
batch_format=batch_format,
zero_copy_batch=zero_copy_batch,
target_block_size=target_block_size,
fn_args=fn_args,
fn_kwargs=fn_kwargs,
fn_constructor_args=fn_constructor_args,
fn_constructor_kwargs=fn_constructor_kwargs,
ray_remote_args=ray_remote_args,
)
logical_plan = LogicalPlan(map_batches_op)

Expand Down
12 changes: 12 additions & 0 deletions python/ray/data/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,18 @@ def target_max_block_size(request):
ctx.target_max_block_size = original


@pytest.fixture(params=[True])
def enable_optimizer(request):
ctx = ray.data.context.DatasetContext.get_current()
original_backend = ctx.new_execution_backend
original_optimizer = ctx.optimizer_enabled
ctx.new_execution_backend = request.param
ctx.optimizer_enabled = request.param
yield request.param
ctx.new_execution_backend = original_backend
ctx.optimizer_enabled = original_optimizer


# ===== Pandas dataset formats =====
@pytest.fixture(scope="function")
def ds_pandas_single_column_format(ray_start_regular_shared):
Expand Down
63 changes: 56 additions & 7 deletions python/ray/data/tests/test_execution_optimizer.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,64 @@
import pytest

import ray
from ray.data.context import DatasetContext
from ray.data._internal.execution.operators.map_operator import MapOperator
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.operators.map_operator import MapRows, MapBatches
from ray.data._internal.logical.planner import Planner
from ray.data.datasource.parquet_datasource import ParquetDatasource

from ray.tests.conftest import * # noqa

def test_e2e_optimizer_sanity(ray_start_cluster_enabled):
ctx = DatasetContext.get_current()
ctx.new_execution_backend = True
ctx.optimizer_enabled = True
ds = ray.data.range(5).map_batches(lambda x: x)
assert ds.take_all() == [0, 1, 2, 3, 4], ds

def test_e2e_optimizer_sanity(ray_start_cluster_enabled, enable_optimizer):
ds = ray.data.range(5)
ds = ds.map_batches(lambda x: x)
ds = ds.map(lambda x: x + 1)
assert ds.take_all() == [1, 2, 3, 4, 5], ds


def test_read_operator(ray_start_cluster_enabled, enable_optimizer):
planner = Planner()
op = Read(ParquetDatasource())
physical_op = planner.plan(op)

assert op.name == "Read"
assert isinstance(physical_op, MapOperator)
assert len(physical_op.input_dependencies) == 1
assert isinstance(physical_op.input_dependencies[0], InputDataBuffer)


def test_map_batches_operator(ray_start_cluster_enabled, enable_optimizer):
planner = Planner()
read_op = Read(ParquetDatasource())
op = MapBatches(
read_op,
lambda it: (x for x in it),
lambda x: x,
)
physical_op = planner.plan(op)

assert op.name == "MapBatches"
assert isinstance(physical_op, MapOperator)
assert len(physical_op.input_dependencies) == 1
assert isinstance(physical_op.input_dependencies[0], MapOperator)


def test_map_rows_operator(ray_start_cluster_enabled, enable_optimizer):
planner = Planner()
read_op = Read(ParquetDatasource())
op = MapRows(
read_op,
lambda it: (x for x in it),
lambda x: x,
)
physical_op = planner.plan(op)

assert op.name == "MapRows"
assert isinstance(physical_op, MapOperator)
assert len(physical_op.input_dependencies) == 1
assert isinstance(physical_op.input_dependencies[0], MapOperator)


if __name__ == "__main__":
Expand Down