Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[Data] Implement Operators for union()" #36583

Merged
merged 1 commit into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/data/_internal/execution/bulk_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def execute_recursive(op: PhysicalOperator) -> List[RefBundle]:
for i, ref_bundles in enumerate(inputs):
for r in ref_bundles:
op.add_input(r, input_index=i)
op.all_inputs_done()
op.inputs_done()
output = _naive_run_until_complete(op)
finally:
op.shutdown()
Expand Down
10 changes: 1 addition & 9 deletions python/ray/data/_internal/execution/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,15 +396,7 @@ def add_input(self, refs: RefBundle, input_index: int) -> None:
"""
raise NotImplementedError

def input_done(self, input_index: int) -> None:
"""Called when the upstream operator at index `input_index` has completed().

After this is called, the executor guarantees that no more inputs will be added
via `add_input` for the given input index.
"""
pass

def all_inputs_done(self) -> None:
def inputs_done(self) -> None:
"""Called when all upstream operators have completed().

After this is called, the executor guarantees that no more inputs will be added
Expand Down
6 changes: 2 additions & 4 deletions python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
RefBundle,
TaskContext,
)
from ray.data._internal.execution.operators.base_physical_operator import (
AllToAllOperator,
)
from ray.data._internal.execution.operators.all_to_all_operator import AllToAllOperator
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
from ray.data._internal.execution.operators.limit_operator import LimitOperator
from ray.data._internal.execution.operators.map_operator import MapOperator
from ray.data._internal.execution.operators.one_to_one_operator import LimitOperator
from ray.data._internal.execution.util import make_callable_class_concurrent
from ray.data._internal.lazy_block_list import LazyBlockList
from ray.data._internal.logical.optimizers import get_execution_plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@ def notify_work_completed(
# For either a completed task or ready worker, we try to dispatch queued tasks.
self._dispatch_tasks()

def all_inputs_done(self):
def inputs_done(self):
# Call base implementation to handle any leftover bundles. This may or may not
# trigger task dispatch.
super().all_inputs_done()
super().inputs_done()

# Mark inputs as done so future task dispatch will kill all inactive workers
# once the bundle queue is exhausted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,10 @@
RefBundle,
TaskContext,
)
from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.stats import StatsDict


class OneToOneOperator(PhysicalOperator):
"""An operator that has one input and one output dependency.

This operator serves as the base for map, filter, limit, etc.
"""

def __init__(
self,
name: str,
input_op: PhysicalOperator,
):
"""Create a OneToOneOperator.
Args:
input_op: Operator generating input data for this op.
name: The name of this operator.
"""
super().__init__(name, [input_op])

@property
def input_dependency(self) -> PhysicalOperator:
return self.input_dependencies[0]


class AllToAllOperator(PhysicalOperator):
"""A blocking operator that executes once its inputs are complete.

Expand All @@ -49,6 +25,7 @@ def __init__(
name: str = "AllToAll",
):
"""Create an AllToAllOperator.

Args:
bulk_fn: The blocking transformation function to run. The inputs are the
list of input ref bundles, and the outputs are the output ref bundles
Expand Down Expand Up @@ -80,15 +57,15 @@ def add_input(self, refs: RefBundle, input_index: int) -> None:
assert input_index == 0, input_index
self._input_buffer.append(refs)

def all_inputs_done(self) -> None:
def inputs_done(self) -> None:
ctx = TaskContext(
task_idx=self._next_task_index,
sub_progress_bar_dict=self._sub_progress_bar_dict,
)
self._output_buffer, self._stats = self._bulk_fn(self._input_buffer, ctx)
self._next_task_index += 1
self._input_buffer.clear()
super().all_inputs_done()
super().inputs_done()

def has_next(self) -> bool:
return len(self._output_buffer) > 0
Expand Down Expand Up @@ -125,23 +102,3 @@ def close_sub_progress_bars(self):
if self._sub_progress_bar_dict is not None:
for sub_bar in self._sub_progress_bar_dict.values():
sub_bar.close()


class NAryOperator(PhysicalOperator):
"""An operator that has multiple input dependencies and one output.

This operator serves as the base for union, zip, etc.
"""

def __init__(
self,
*input_ops: LogicalOperator,
):
"""Create a OneToOneOperator.
Args:
input_op: Operator generating input data for this op.
name: The name of this operator.
"""
input_names = ", ".join([op._name for op in input_ops])
op_name = f"{self.__class__.__name__}({input_names})"
super().__init__(op_name, list(input_ops))
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
RefBundle,
TaskContext,
)
from ray.data._internal.execution.operators.base_physical_operator import (
OneToOneOperator,
)
from ray.data._internal.execution.operators.one_to_one_operator import OneToOneOperator
from ray.data._internal.memory_tracing import trace_allocation
from ray.data._internal.stats import StatsDict
from ray.data.block import Block, BlockAccessor, BlockExecStats, BlockMetadata
Expand Down Expand Up @@ -282,13 +280,13 @@ def _handle_task_done(self, task: "_TaskState"):
if self._metrics.cur > self._metrics.peak:
self._metrics.peak = self._metrics.cur

def all_inputs_done(self):
def inputs_done(self):
self._block_ref_bundler.done_adding_bundles()
if self._block_ref_bundler.has_bundle():
# Handle any leftover bundles in the bundler.
bundle = self._block_ref_bundler.get_next_bundle()
self._add_bundled_input(bundle)
super().all_inputs_done()
super().inputs_done()

def has_next(self) -> bool:
assert self._started
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,35 @@

import ray
from ray.data._internal.execution.interfaces import PhysicalOperator, RefBundle
from ray.data._internal.execution.operators.base_physical_operator import (
OneToOneOperator,
)
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.stats import StatsDict
from ray.data.block import Block, BlockAccessor, BlockMetadata
from ray.types import ObjectRef


class OneToOneOperator(PhysicalOperator):
"""An operator that has one input and one output dependency.
This operator serves as the base for map, filter, limit, etc.
"""

def __init__(
self,
name: str,
input_op: PhysicalOperator,
):
"""Create a OneToOneOperator.

Args:
input_op: Operator generating input data for this op.
name: The name of this operator.
"""
super().__init__(name, [input_op])

@property
def input_dependency(self) -> PhysicalOperator:
return self.input_dependencies[0]


class LimitOperator(OneToOneOperator):
"""Physical operator for limit."""

Expand All @@ -29,7 +49,7 @@ def __init__(
self._cur_output_bundles = 0
super().__init__(self._name, input_op)
if self._limit <= 0:
self.all_inputs_done()
self.inputs_done()

def _limit_reached(self) -> bool:
return self._consumed_rows >= self._limit
Expand Down Expand Up @@ -79,7 +99,7 @@ def slice_fn(block, metadata, num_rows) -> Tuple[Block, BlockMetadata]:
)
self._buffer.append(out_refs)
if self._limit_reached():
self.all_inputs_done()
self.inputs_done()

def has_next(self) -> bool:
return len(self._buffer) > 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def add_input(self, bundle, input_index) -> None:
self._buffer.append(bundle)
self._dispatch_bundles()

def all_inputs_done(self) -> None:
super().all_inputs_done()
def inputs_done(self) -> None:
super().inputs_done()
if not self._equal:
self._dispatch_bundles(dispatch_all=True)
assert not self._buffer, "Should have dispatched all bundles."
Expand Down
107 changes: 0 additions & 107 deletions python/ray/data/_internal/execution/operators/union_operator.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ def add_input(self, refs: RefBundle, input_index: int) -> None:
else:
self._right_buffer.append(refs)

def all_inputs_done(self) -> None:
def inputs_done(self) -> None:
self._output_buffer, self._stats = self._zip(
self._left_buffer, self._right_buffer
)
self._left_buffer.clear()
self._right_buffer.clear()
super().all_inputs_done()
super().inputs_done()

def has_next(self) -> bool:
return len(self._output_buffer) > 0
Expand Down
3 changes: 0 additions & 3 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
build_streaming_topology,
process_completed_tasks,
select_operator_to_run,
update_operator_states,
)
from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.stats import DatasetStats
Expand Down Expand Up @@ -258,8 +257,6 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
autoscaling_state=self._autoscaling_state,
)

update_operator_states(topology)

# Update the progress bar to reflect scheduling decisions.
for op_state in topology.values():
op_state.refresh_progress_bar()
Expand Down
Loading