diff --git a/python/ray/data/_internal/execution/bulk_executor.py b/python/ray/data/_internal/execution/bulk_executor.py index e1fa4df1c220..f3b3c5f66ee5 100644 --- a/python/ray/data/_internal/execution/bulk_executor.py +++ b/python/ray/data/_internal/execution/bulk_executor.py @@ -62,7 +62,7 @@ def execute_recursive(op: PhysicalOperator) -> List[RefBundle]: for i, ref_bundles in enumerate(inputs): for r in ref_bundles: op.add_input(r, input_index=i) - op.all_inputs_done() + op.inputs_done() output = _naive_run_until_complete(op) finally: op.shutdown() diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index 29652bceb6fa..de53c2b2458f 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -396,15 +396,7 @@ def add_input(self, refs: RefBundle, input_index: int) -> None: """ raise NotImplementedError - def input_done(self, input_index: int) -> None: - """Called when the upstream operator at index `input_index` has completed(). - - After this is called, the executor guarantees that no more inputs will be added - via `add_input` for the given input index. - """ - pass - - def all_inputs_done(self) -> None: + def inputs_done(self) -> None: """Called when all upstream operators have completed(). After this is called, the executor guarantees that no more inputs will be added diff --git a/python/ray/data/_internal/execution/legacy_compat.py b/python/ray/data/_internal/execution/legacy_compat.py index 1fdf28a050e2..9dac8411be71 100644 --- a/python/ray/data/_internal/execution/legacy_compat.py +++ b/python/ray/data/_internal/execution/legacy_compat.py @@ -14,12 +14,10 @@ RefBundle, TaskContext, ) -from ray.data._internal.execution.operators.base_physical_operator import ( - AllToAllOperator, -) +from ray.data._internal.execution.operators.all_to_all_operator import AllToAllOperator from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer -from ray.data._internal.execution.operators.limit_operator import LimitOperator from ray.data._internal.execution.operators.map_operator import MapOperator +from ray.data._internal.execution.operators.one_to_one_operator import LimitOperator from ray.data._internal.execution.util import make_callable_class_concurrent from ray.data._internal.lazy_block_list import LazyBlockList from ray.data._internal.logical.optimizers import get_execution_plan diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index c0d6e0d8627c..08450657e62e 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -230,10 +230,10 @@ def notify_work_completed( # For either a completed task or ready worker, we try to dispatch queued tasks. self._dispatch_tasks() - def all_inputs_done(self): + def inputs_done(self): # Call base implementation to handle any leftover bundles. This may or may not # trigger task dispatch. - super().all_inputs_done() + super().inputs_done() # Mark inputs as done so future task dispatch will kill all inactive workers # once the bundle queue is exhausted. diff --git a/python/ray/data/_internal/execution/operators/base_physical_operator.py b/python/ray/data/_internal/execution/operators/all_to_all_operator.py similarity index 73% rename from python/ray/data/_internal/execution/operators/base_physical_operator.py rename to python/ray/data/_internal/execution/operators/all_to_all_operator.py index 26e23416a8eb..dfe7a4107e34 100644 --- a/python/ray/data/_internal/execution/operators/base_physical_operator.py +++ b/python/ray/data/_internal/execution/operators/all_to_all_operator.py @@ -6,34 +6,10 @@ RefBundle, TaskContext, ) -from ray.data._internal.logical.interfaces import LogicalOperator from ray.data._internal.progress_bar import ProgressBar from ray.data._internal.stats import StatsDict -class OneToOneOperator(PhysicalOperator): - """An operator that has one input and one output dependency. - - This operator serves as the base for map, filter, limit, etc. - """ - - def __init__( - self, - name: str, - input_op: PhysicalOperator, - ): - """Create a OneToOneOperator. - Args: - input_op: Operator generating input data for this op. - name: The name of this operator. - """ - super().__init__(name, [input_op]) - - @property - def input_dependency(self) -> PhysicalOperator: - return self.input_dependencies[0] - - class AllToAllOperator(PhysicalOperator): """A blocking operator that executes once its inputs are complete. @@ -49,6 +25,7 @@ def __init__( name: str = "AllToAll", ): """Create an AllToAllOperator. + Args: bulk_fn: The blocking transformation function to run. The inputs are the list of input ref bundles, and the outputs are the output ref bundles @@ -80,7 +57,7 @@ def add_input(self, refs: RefBundle, input_index: int) -> None: assert input_index == 0, input_index self._input_buffer.append(refs) - def all_inputs_done(self) -> None: + def inputs_done(self) -> None: ctx = TaskContext( task_idx=self._next_task_index, sub_progress_bar_dict=self._sub_progress_bar_dict, @@ -88,7 +65,7 @@ def all_inputs_done(self) -> None: self._output_buffer, self._stats = self._bulk_fn(self._input_buffer, ctx) self._next_task_index += 1 self._input_buffer.clear() - super().all_inputs_done() + super().inputs_done() def has_next(self) -> bool: return len(self._output_buffer) > 0 @@ -125,23 +102,3 @@ def close_sub_progress_bars(self): if self._sub_progress_bar_dict is not None: for sub_bar in self._sub_progress_bar_dict.values(): sub_bar.close() - - -class NAryOperator(PhysicalOperator): - """An operator that has multiple input dependencies and one output. - - This operator serves as the base for union, zip, etc. - """ - - def __init__( - self, - *input_ops: LogicalOperator, - ): - """Create a OneToOneOperator. - Args: - input_op: Operator generating input data for this op. - name: The name of this operator. - """ - input_names = ", ".join([op._name for op in input_ops]) - op_name = f"{self.__class__.__name__}({input_names})" - super().__init__(op_name, list(input_ops)) diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index d431b7fb5745..df6764c69b51 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -19,9 +19,7 @@ RefBundle, TaskContext, ) -from ray.data._internal.execution.operators.base_physical_operator import ( - OneToOneOperator, -) +from ray.data._internal.execution.operators.one_to_one_operator import OneToOneOperator from ray.data._internal.memory_tracing import trace_allocation from ray.data._internal.stats import StatsDict from ray.data.block import Block, BlockAccessor, BlockExecStats, BlockMetadata @@ -282,13 +280,13 @@ def _handle_task_done(self, task: "_TaskState"): if self._metrics.cur > self._metrics.peak: self._metrics.peak = self._metrics.cur - def all_inputs_done(self): + def inputs_done(self): self._block_ref_bundler.done_adding_bundles() if self._block_ref_bundler.has_bundle(): # Handle any leftover bundles in the bundler. bundle = self._block_ref_bundler.get_next_bundle() self._add_bundled_input(bundle) - super().all_inputs_done() + super().inputs_done() def has_next(self) -> bool: assert self._started diff --git a/python/ray/data/_internal/execution/operators/limit_operator.py b/python/ray/data/_internal/execution/operators/one_to_one_operator.py similarity index 83% rename from python/ray/data/_internal/execution/operators/limit_operator.py rename to python/ray/data/_internal/execution/operators/one_to_one_operator.py index 2c06531e9e1f..62e0b7d8ec47 100644 --- a/python/ray/data/_internal/execution/operators/limit_operator.py +++ b/python/ray/data/_internal/execution/operators/one_to_one_operator.py @@ -4,15 +4,35 @@ import ray from ray.data._internal.execution.interfaces import PhysicalOperator, RefBundle -from ray.data._internal.execution.operators.base_physical_operator import ( - OneToOneOperator, -) from ray.data._internal.remote_fn import cached_remote_fn from ray.data._internal.stats import StatsDict from ray.data.block import Block, BlockAccessor, BlockMetadata from ray.types import ObjectRef +class OneToOneOperator(PhysicalOperator): + """An operator that has one input and one output dependency. + This operator serves as the base for map, filter, limit, etc. + """ + + def __init__( + self, + name: str, + input_op: PhysicalOperator, + ): + """Create a OneToOneOperator. + + Args: + input_op: Operator generating input data for this op. + name: The name of this operator. + """ + super().__init__(name, [input_op]) + + @property + def input_dependency(self) -> PhysicalOperator: + return self.input_dependencies[0] + + class LimitOperator(OneToOneOperator): """Physical operator for limit.""" @@ -29,7 +49,7 @@ def __init__( self._cur_output_bundles = 0 super().__init__(self._name, input_op) if self._limit <= 0: - self.all_inputs_done() + self.inputs_done() def _limit_reached(self) -> bool: return self._consumed_rows >= self._limit @@ -79,7 +99,7 @@ def slice_fn(block, metadata, num_rows) -> Tuple[Block, BlockMetadata]: ) self._buffer.append(out_refs) if self._limit_reached(): - self.all_inputs_done() + self.inputs_done() def has_next(self) -> bool: return len(self._buffer) > 0 diff --git a/python/ray/data/_internal/execution/operators/output_splitter.py b/python/ray/data/_internal/execution/operators/output_splitter.py index 020fc509e4fe..f04529974c99 100644 --- a/python/ray/data/_internal/execution/operators/output_splitter.py +++ b/python/ray/data/_internal/execution/operators/output_splitter.py @@ -101,8 +101,8 @@ def add_input(self, bundle, input_index) -> None: self._buffer.append(bundle) self._dispatch_bundles() - def all_inputs_done(self) -> None: - super().all_inputs_done() + def inputs_done(self) -> None: + super().inputs_done() if not self._equal: self._dispatch_bundles(dispatch_all=True) assert not self._buffer, "Should have dispatched all bundles." diff --git a/python/ray/data/_internal/execution/operators/union_operator.py b/python/ray/data/_internal/execution/operators/union_operator.py deleted file mode 100644 index df47063d4fdb..000000000000 --- a/python/ray/data/_internal/execution/operators/union_operator.py +++ /dev/null @@ -1,107 +0,0 @@ -from typing import List, Optional - -from ray.data._internal.execution.interfaces import ( - ExecutionOptions, - PhysicalOperator, - RefBundle, -) -from ray.data._internal.execution.operators.base_physical_operator import NAryOperator -from ray.data._internal.stats import StatsDict - - -class UnionOperator(NAryOperator): - """An operator that combines output blocks from - two or more input operators into a single output.""" - - def __init__( - self, - *input_ops: PhysicalOperator, - ): - """Create a UnionOperator. - - Args: - input_ops: Operators generating input data for this operator to union. - """ - - # By default, union does not preserve the order of output blocks. - # To preserve the order, configure ExecutionOptions accordingly. - self._preserve_order = False - - # Intermediary buffers used to store blocks from each input dependency. - # Only used when `self._prserve_order` is True. - self._input_buffers: List[List[RefBundle]] = [[] for _ in range(len(input_ops))] - - # The index of the input dependency that is currently the source of - # the output buffer. New inputs from this input dependency will be added - # directly to the output buffer. Only used when `self._preserve_order` is True. - self._input_idx_to_output = 0 - - self._output_buffer: List[RefBundle] = [] - self._stats: StatsDict = {} - super().__init__(*input_ops) - - def start(self, options: ExecutionOptions): - # Whether to preserve the order of the input data (both the - # order of the input operators and the order of the blocks within). - self._preserve_order = options.preserve_order - super().start(options) - - def num_outputs_total(self) -> Optional[int]: - num_outputs = 0 - for input_op in self.input_dependencies: - op_num_outputs = input_op.num_outputs_total() - # If at least one of the input ops has an unknown number of outputs, - # the number of outputs of the union operator is unknown. - if op_num_outputs is None: - return None - num_outputs += op_num_outputs - return num_outputs - - def add_input(self, refs: RefBundle, input_index: int) -> None: - assert not self.completed() - assert 0 <= input_index <= len(self._input_dependencies), input_index - - if not self._preserve_order: - self._output_buffer.append(refs) - else: - if input_index == self._input_idx_to_output: - self._output_buffer.append(refs) - else: - self._input_buffers[input_index].append(refs) - - def input_done(self, input_index: int) -> None: - """When `self._preserve_order` is True, change the - output buffer source to the next input dependency - once the current input dependency calls `input_done()`.""" - if not self._preserve_order: - return - if not input_index == self._input_idx_to_output: - return - next_input_idx = self._input_idx_to_output + 1 - if next_input_idx < len(self._input_buffers): - self._output_buffer.extend(self._input_buffers[next_input_idx]) - self._input_buffers[next_input_idx].clear() - self._input_idx_to_output = next_input_idx - super().input_done(input_index) - - def all_inputs_done(self) -> None: - # Note that in the case where order is not preserved, all inputs - # are directly added to the output buffer as soon as they are received, - # so there is no need to check any intermediary buffers. - if self._preserve_order: - for idx, input_buffer in enumerate(self._input_buffers): - assert len(input_buffer) == 0, ( - f"Input at index {idx} still has " - f"{len(input_buffer)} blocks remaining." - ) - super().all_inputs_done() - - def has_next(self) -> bool: - # Check if the output buffer still contains at least one block. - return len(self._output_buffer) > 0 - - def get_next(self) -> RefBundle: - return self._output_buffer.pop(0) - - def get_stats(self) -> StatsDict: - return self._stats diff --git a/python/ray/data/_internal/execution/operators/zip_operator.py b/python/ray/data/_internal/execution/operators/zip_operator.py index a79b91199c8b..737ac5124d2b 100644 --- a/python/ray/data/_internal/execution/operators/zip_operator.py +++ b/python/ray/data/_internal/execution/operators/zip_operator.py @@ -59,13 +59,13 @@ def add_input(self, refs: RefBundle, input_index: int) -> None: else: self._right_buffer.append(refs) - def all_inputs_done(self) -> None: + def inputs_done(self) -> None: self._output_buffer, self._stats = self._zip( self._left_buffer, self._right_buffer ) self._left_buffer.clear() self._right_buffer.clear() - super().all_inputs_done() + super().inputs_done() def has_next(self) -> bool: return len(self._output_buffer) > 0 diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 152fea508b76..4958b93f8ad2 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -27,7 +27,6 @@ build_streaming_topology, process_completed_tasks, select_operator_to_run, - update_operator_states, ) from ray.data._internal.progress_bar import ProgressBar from ray.data._internal.stats import DatasetStats @@ -258,8 +257,6 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: autoscaling_state=self._autoscaling_state, ) - update_operator_states(topology) - # Update the progress bar to reflect scheduling decisions. for op_state in topology.values(): op_state.refresh_progress_bar() diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 1a2ab01e47ac..0528ec3ad0d2 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -19,9 +19,7 @@ PhysicalOperator, RefBundle, ) -from ray.data._internal.execution.operators.base_physical_operator import ( - AllToAllOperator, -) +from ray.data._internal.execution.operators.all_to_all_operator import AllToAllOperator from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.util import memory_string from ray.data._internal.progress_bar import ProgressBar @@ -122,8 +120,6 @@ def __init__(self, op: PhysicalOperator, inqueues: List[Deque[MaybeRefBundle]]): self.progress_bar = None self.num_completed_tasks = 0 self.inputs_done_called = False - # Tracks whether `input_done` is called for each input op. - self.input_done_called = [False] * len(op.input_dependencies) self.dependents_completed_called = False def initialize_progress_bars(self, index: int, verbose_progress: bool) -> int: @@ -310,8 +306,7 @@ def setup_state(op: PhysicalOperator) -> OpState: def process_completed_tasks(topology: Topology) -> None: - """Process any newly completed tasks. To update operator - states, call `update_operator_states()` afterwards.""" + """Process any newly completed tasks and update operator state.""" # Update active tasks. active_tasks: Dict[ray.ObjectRef, PhysicalOperator] = {} @@ -337,26 +332,18 @@ def process_completed_tasks(topology: Topology) -> None: while op.has_next(): op_state.add_output(op.get_next()) - -def update_operator_states(topology: Topology) -> None: - """Update operator states accordingly for newly completed tasks. - Should be called after `process_completed_tasks()`.""" - # Call inputs_done() on ops where no more inputs are coming. for op, op_state in topology.items(): if op_state.inputs_done_called: continue - all_inputs_done = True - for idx, dep in enumerate(op.input_dependencies): - if dep.completed() and not topology[dep].outqueue: - if not op_state.input_done_called[idx]: - op.input_done(idx) - op_state.input_done_called[idx] = True - else: - all_inputs_done = False - - if all_inputs_done: - op.all_inputs_done() + inputs_done = all( + [ + dep.completed() and not topology[dep].outqueue + for dep in op.input_dependencies + ] + ) + if inputs_done: + op.inputs_done() op_state.inputs_done_called = True # Traverse the topology in reverse topological order. diff --git a/python/ray/data/_internal/logical/operators/n_ary_operator.py b/python/ray/data/_internal/logical/operators/n_ary_operator.py index 0d695c616719..145979f5ee3b 100644 --- a/python/ray/data/_internal/logical/operators/n_ary_operator.py +++ b/python/ray/data/_internal/logical/operators/n_ary_operator.py @@ -1,21 +1,7 @@ from ray.data._internal.logical.interfaces import LogicalOperator -class NAry(LogicalOperator): - """Base class for n-ary operators, which take multiple input operators.""" - - def __init__( - self, - *input_ops: LogicalOperator, - ): - """ - Args: - input_ops: The input operators. - """ - super().__init__(self.__class__.__name__, list(input_ops)) - - -class Zip(NAry): +class Zip(LogicalOperator): """Logical operator for zip.""" def __init__( @@ -28,14 +14,4 @@ def __init__( left_input_ops: The input operator at left hand side. right_input_op: The input operator at right hand side. """ - super().__init__(left_input_op, right_input_op) - - -class Union(NAry): - """Logical operator for union.""" - - def __init__( - self, - *input_ops: LogicalOperator, - ): - super().__init__(*input_ops) + super().__init__("Zip", [left_input_op, right_input_op]) diff --git a/python/ray/data/_internal/logical/rules/operator_fusion.py b/python/ray/data/_internal/logical/rules/operator_fusion.py index 5ac00a7cdb44..1f9cddd4f257 100644 --- a/python/ray/data/_internal/logical/rules/operator_fusion.py +++ b/python/ray/data/_internal/logical/rules/operator_fusion.py @@ -10,9 +10,7 @@ from ray.data._internal.execution.operators.actor_pool_map_operator import ( ActorPoolMapOperator, ) -from ray.data._internal.execution.operators.base_physical_operator import ( - AllToAllOperator, -) +from ray.data._internal.execution.operators.all_to_all_operator import AllToAllOperator from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.operators.task_pool_map_operator import ( TaskPoolMapOperator, diff --git a/python/ray/data/_internal/planner/plan_all_to_all_op.py b/python/ray/data/_internal/planner/plan_all_to_all_op.py index 02c111d45319..749967563ec0 100644 --- a/python/ray/data/_internal/planner/plan_all_to_all_op.py +++ b/python/ray/data/_internal/planner/plan_all_to_all_op.py @@ -1,7 +1,5 @@ from ray.data._internal.execution.interfaces import PhysicalOperator -from ray.data._internal.execution.operators.base_physical_operator import ( - AllToAllOperator, -) +from ray.data._internal.execution.operators.all_to_all_operator import AllToAllOperator from ray.data._internal.logical.operators.all_to_all_operator import ( AbstractAllToAll, Aggregate, diff --git a/python/ray/data/_internal/planner/plan_limit_op.py b/python/ray/data/_internal/planner/plan_limit_op.py index edd92e798752..31e97a5f2c74 100644 --- a/python/ray/data/_internal/planner/plan_limit_op.py +++ b/python/ray/data/_internal/planner/plan_limit_op.py @@ -1,6 +1,6 @@ from typing import TYPE_CHECKING -from ray.data._internal.execution.operators.limit_operator import LimitOperator +from ray.data._internal.execution.operators.one_to_one_operator import LimitOperator if TYPE_CHECKING: from ray.data._internal.execution.interfaces import PhysicalOperator diff --git a/python/ray/data/_internal/planner/planner.py b/python/ray/data/_internal/planner/planner.py index 0efbefd40d11..ab8794286d2a 100644 --- a/python/ray/data/_internal/planner/planner.py +++ b/python/ray/data/_internal/planner/planner.py @@ -1,7 +1,6 @@ from typing import Dict from ray.data._internal.execution.interfaces import PhysicalOperator -from ray.data._internal.execution.operators.union_operator import UnionOperator from ray.data._internal.execution.operators.zip_operator import ZipOperator from ray.data._internal.logical.interfaces import ( LogicalOperator, @@ -12,7 +11,7 @@ from ray.data._internal.logical.operators.from_operators import AbstractFrom from ray.data._internal.logical.operators.input_data_operator import InputData from ray.data._internal.logical.operators.map_operator import AbstractUDFMap -from ray.data._internal.logical.operators.n_ary_operator import Union, Zip +from ray.data._internal.logical.operators.n_ary_operator import Zip from ray.data._internal.logical.operators.one_to_one_operator import Limit from ray.data._internal.logical.operators.read_operator import Read from ray.data._internal.logical.operators.write_operator import Write @@ -67,9 +66,6 @@ def _plan(self, logical_op: LogicalOperator) -> PhysicalOperator: elif isinstance(logical_op, Zip): assert len(physical_children) == 2 physical_op = ZipOperator(physical_children[0], physical_children[1]) - elif isinstance(logical_op, Union): - assert len(physical_children) >= 2 - physical_op = UnionOperator(*physical_children) elif isinstance(logical_op, Limit): assert len(physical_children) == 1 physical_op = _plan_limit_op(logical_op, physical_children[0]) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 6a189bc1288f..7e1d64226459 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -57,9 +57,6 @@ MapBatches, MapRows, ) -from ray.data._internal.logical.operators.n_ary_operator import ( - Union as UnionLogicalOperator, -) from ray.data._internal.logical.operators.n_ary_operator import Zip from ray.data._internal.logical.operators.one_to_one_operator import Limit from ray.data._internal.logical.operators.write_operator import Write @@ -1618,25 +1615,14 @@ def union(self, *other: List["Dataset"]) -> "Dataset": if has_nonlazy: blocks = [] metadata = [] - ops_to_union = [] - for idx, bl in enumerate(bls): + for bl in bls: if isinstance(bl, LazyBlockList): bs, ms = bl._get_blocks_with_metadata() else: - assert isinstance(bl, BlockList), type(bl) bs, ms = bl._blocks, bl._metadata - op_logical_plan = getattr(datasets[idx]._plan, "_logical_plan", None) - if isinstance(op_logical_plan, LogicalPlan): - ops_to_union.append(op_logical_plan.dag) - else: - ops_to_union.append(None) blocks.extend(bs) metadata.extend(ms) blocklist = BlockList(blocks, metadata, owned_by_consumer=owned_by_consumer) - - logical_plan = None - if all(ops_to_union): - logical_plan = LogicalPlan(UnionLogicalOperator(*ops_to_union)) else: tasks: List[ReadTask] = [] block_partition_refs: List[ObjectRef[BlockPartition]] = [] @@ -1664,16 +1650,6 @@ def union(self, *other: List["Dataset"]) -> "Dataset": owned_by_consumer=owned_by_consumer, ) - logical_plan = self._logical_plan - logical_plans = [ - getattr(union_ds, "_logical_plan", None) for union_ds in datasets - ] - if all(logical_plans): - op = UnionLogicalOperator( - *[plan.dag for plan in logical_plans], - ) - logical_plan = LogicalPlan(op) - epochs = [ds._get_epoch() for ds in datasets] max_epoch = max(*epochs) if len(set(epochs)) > 1: @@ -1693,7 +1669,6 @@ def union(self, *other: List["Dataset"]) -> "Dataset": ExecutionPlan(blocklist, stats, run_by_consumer=owned_by_consumer), max_epoch, self._lazy, - logical_plan, ) def groupby(self, key: Optional[str]) -> "GroupedData": diff --git a/python/ray/data/tests/test_bulk_executor.py b/python/ray/data/tests/test_bulk_executor.py index 01717a5f41b3..a7085639c40b 100644 --- a/python/ray/data/tests/test_bulk_executor.py +++ b/python/ray/data/tests/test_bulk_executor.py @@ -8,9 +8,7 @@ from ray.data._internal.compute import ActorPoolStrategy from ray.data._internal.execution.bulk_executor import BulkExecutor from ray.data._internal.execution.interfaces import ExecutionOptions, RefBundle -from ray.data._internal.execution.operators.base_physical_operator import ( - AllToAllOperator, -) +from ray.data._internal.execution.operators.all_to_all_operator import AllToAllOperator from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.util import make_ref_bundles diff --git a/python/ray/data/tests/test_execution_optimizer.py b/python/ray/data/tests/test_execution_optimizer.py index 32e8bfe40e6e..f00676bc4105 100644 --- a/python/ray/data/tests/test_execution_optimizer.py +++ b/python/ray/data/tests/test_execution_optimizer.py @@ -6,14 +6,10 @@ import pytest import ray -from ray.data._internal.execution.interfaces import ExecutionOptions from ray.data._internal.execution.legacy_compat import _blocks_to_input_buffer -from ray.data._internal.execution.operators.base_physical_operator import ( - AllToAllOperator, -) +from ray.data._internal.execution.operators.all_to_all_operator import AllToAllOperator from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.operators.map_operator import MapOperator -from ray.data._internal.execution.operators.union_operator import UnionOperator from ray.data._internal.execution.operators.zip_operator import ZipOperator from ray.data._internal.logical.interfaces import LogicalPlan from ray.data._internal.logical.operators.all_to_all_operator import ( @@ -34,7 +30,7 @@ MapBatches, MapRows, ) -from ray.data._internal.logical.operators.n_ary_operator import Union, Zip +from ray.data._internal.logical.operators.n_ary_operator import Zip from ray.data._internal.logical.operators.read_operator import Read from ray.data._internal.logical.operators.write_operator import Write from ray.data._internal.logical.optimizers import PhysicalOptimizer @@ -46,8 +42,6 @@ from ray.data._internal.planner.planner import Planner from ray.data._internal.stats import DatasetStats from ray.data.aggregate import Count -from ray.data.datasource.datasource import RangeDatasource -from ray.data.datasource.json_datasource import JSONDatasource from ray.data.datasource.parquet_datasource import ParquetDatasource from ray.data.tests.conftest import * # noqa from ray.data.tests.util import column_udf, extract_values, named_values @@ -380,74 +374,6 @@ def _check_repartition_usage_and_stats(ds): _check_repartition_usage_and_stats(ds) -@pytest.mark.parametrize("preserve_order", (True, False)) -def test_union_operator(ray_start_regular_shared, enable_optimizer, preserve_order): - planner = Planner() - read_parquet_op = Read(ParquetDatasource(), []) - read_range_op = Read(RangeDatasource(), []) - read_json_op = Read(JSONDatasource(), []) - union_op = Union( - read_parquet_op, - read_range_op, - read_json_op, - ) - plan = LogicalPlan(union_op) - physical_op = planner.plan(plan).dag - - assert union_op.name == "Union" - assert isinstance(physical_op, UnionOperator) - assert len(physical_op.input_dependencies) == 3 - for input_op in physical_op.input_dependencies: - assert isinstance(input_op, MapOperator) - - -@pytest.mark.parametrize("preserve_order", (True, False)) -def test_union_e2e(ray_start_regular_shared, enable_optimizer, preserve_order): - execution_options = ExecutionOptions(preserve_order=preserve_order) - ctx = ray.data.DataContext.get_current() - ctx.execution_options = execution_options - - ds = ray.data.range(20, parallelism=10) - - # Test lazy union. - ds = ds.union(ds, ds, ds, ds) - assert ds.num_blocks() == 50 - assert ds.count() == 100 - assert ds.sum() == 950 - _check_usage_record(["ReadRange", "Union"]) - ds_result = [{"id": i} for i in range(20)] * 5 - if preserve_order: - assert ds.take_all() == ds_result - - ds = ds.union(ds) - assert ds.count() == 200 - assert ds.sum() == (950 * 2) - _check_usage_record(["ReadRange", "Union"]) - if preserve_order: - assert ds.take_all() == ds_result * 2 - - # Test materialized union. - ds2 = ray.data.from_items([{"id": i} for i in range(1, 5 + 1)]) - assert ds2.count() == 5 - assert ds2.sum() == 15 - _check_usage_record(["FromItems"]) - - ds2 = ds2.union(ds2) - assert ds2.count() == 10 - assert ds2.sum() == 30 - _check_usage_record(["FromItems", "Union"]) - ds2_result = ([{"id": i} for i in range(1, 5 + 1)]) * 2 - if preserve_order: - assert ds2.take_all() == ds2_result - - ds2 = ds2.union(ds) - assert ds2.count() == 210 - assert ds2.sum() == (950 * 2 + 30) - _check_usage_record(["FromItems", "Union"]) - if preserve_order: - assert ds2.take_all() == (ds2_result + ds_result * 2) - - def test_read_map_batches_operator_fusion(ray_start_regular_shared, enable_optimizer): # Test that Read is fused with MapBatches. planner = Planner() diff --git a/python/ray/data/tests/test_executor_resource_management.py b/python/ray/data/tests/test_executor_resource_management.py index 90dc54ffda5b..c1a1a8787e2b 100644 --- a/python/ray/data/tests/test_executor_resource_management.py +++ b/python/ray/data/tests/test_executor_resource_management.py @@ -210,7 +210,7 @@ def test_actor_pool_resource_reporting(ray_start_10_cpus_shared): assert usage.object_store_memory == pytest.approx(2560, rel=0.5), usage # Indicate that no more inputs will arrive. - op.all_inputs_done() + op.inputs_done() # Wait until tasks are done. work_refs = op.get_work_refs() @@ -294,7 +294,7 @@ def test_actor_pool_resource_reporting_with_bundling(ray_start_10_cpus_shared): assert usage.object_store_memory == pytest.approx(3200, rel=0.5), usage # Indicate that no more inputs will arrive. - op.all_inputs_done() + op.inputs_done() # Wait until tasks are done. work_refs = op.get_work_refs() diff --git a/python/ray/data/tests/test_operators.py b/python/ray/data/tests/test_operators.py index 752eaa9613a0..0c2d8697f63d 100644 --- a/python/ray/data/tests/test_operators.py +++ b/python/ray/data/tests/test_operators.py @@ -19,20 +19,17 @@ from ray.data._internal.execution.operators.actor_pool_map_operator import ( ActorPoolMapOperator, ) -from ray.data._internal.execution.operators.base_physical_operator import ( - AllToAllOperator, -) +from ray.data._internal.execution.operators.all_to_all_operator import AllToAllOperator from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer -from ray.data._internal.execution.operators.limit_operator import LimitOperator from ray.data._internal.execution.operators.map_operator import ( MapOperator, _BlockRefBundler, ) +from ray.data._internal.execution.operators.one_to_one_operator import LimitOperator from ray.data._internal.execution.operators.output_splitter import OutputSplitter from ray.data._internal.execution.operators.task_pool_map_operator import ( TaskPoolMapOperator, ) -from ray.data._internal.execution.operators.union_operator import UnionOperator from ray.data._internal.execution.util import make_ref_bundles from ray.data.block import Block from ray.tests.conftest import * # noqa @@ -91,7 +88,7 @@ def dummy_all_transform(bundles: List[RefBundle], ctx): op.start(ExecutionOptions()) while input_op.has_next(): op.add_input(input_op.get_next(), 0) - op.all_inputs_done() + op.inputs_done() # Check we return transformed bundles. assert not op.completed() @@ -146,7 +143,7 @@ def test_map_operator_bulk(ray_start_regular_shared, use_actors): assert op.internal_queue_size() == i else: assert op.internal_queue_size() == 0 - op.all_inputs_done() + op.inputs_done() work_refs = op.get_work_refs() while work_refs: for work_ref in work_refs: @@ -242,7 +239,7 @@ def test_split_operator(ray_start_regular_shared, equal, chunk_size): assert ref.owns_blocks, ref for block, _ in ref.blocks: output_splits[ref.output_split_idx].extend(list(ray.get(block)["id"])) - op.all_inputs_done() + op.inputs_done() if equal: for i in range(3): assert len(output_splits[i]) == 33 * chunk_size, output_splits @@ -269,7 +266,7 @@ def test_split_operator_random(ray_start_regular_shared, equal, random_seed): op.start(ExecutionOptions()) while input_op.has_next(): op.add_input(input_op.get_next(), 0) - op.all_inputs_done() + op.inputs_done() while op.has_next(): ref = op.get_next() assert ref.owns_blocks, ref @@ -306,7 +303,7 @@ def get_bundle_loc(bundle): op.start(ExecutionOptions()) while input_op.has_next(): op.add_input(input_op.get_next(), 0) - op.all_inputs_done() + op.inputs_done() while op.has_next(): ref = op.get_next() assert ref.owns_blocks, ref @@ -394,7 +391,7 @@ def _check_batch(block_iter: Iterable[Block], ctx) -> Iterable[Block]: op.start(ExecutionOptions()) while input_op.has_next(): op.add_input(input_op.get_next(), 0) - op.all_inputs_done() + op.inputs_done() work_refs = op.get_work_refs() while work_refs: for work_ref in work_refs: @@ -439,7 +436,7 @@ def noop(block_iter: Iterable[Block], ctx) -> Iterable[Block]: assert len(inputs) == 10 for input_ in inputs: op.add_input(input_, 0) - op.all_inputs_done() + op.inputs_done() work_refs = op.get_work_refs() while work_refs: for work_ref in work_refs: @@ -474,7 +471,7 @@ def test_map_operator_ray_args(shutdown_only, use_actors): op.start(ExecutionOptions()) while input_op.has_next(): op.add_input(input_op.get_next(), 0) - op.all_inputs_done() + op.inputs_done() work_refs = op.get_work_refs() while work_refs: for work_ref in work_refs: @@ -605,7 +602,7 @@ def test_limit_operator(ray_start_regular_shared): refs = make_ref_bundles([[i] * num_rows_per_block for i in range(num_refs)]) input_op = InputDataBuffer(refs) limit_op = LimitOperator(limit, input_op) - limit_op.all_inputs_done = MagicMock(wraps=limit_op.all_inputs_done) + limit_op.inputs_done = MagicMock(wraps=limit_op.inputs_done) if limit == 0: # If the limit is 0, the operator should be completed immediately. assert limit_op.completed() @@ -627,16 +624,16 @@ def test_limit_operator(ray_start_regular_shared): limit_op.get_next() cur_rows += num_rows_per_block if cur_rows >= limit: - assert limit_op.all_inputs_done.call_count == 1, limit + assert limit_op.inputs_done.call_count == 1, limit assert limit_op.completed(), limit assert limit_op._limit_reached(), limit assert not limit_op.need_more_inputs(), limit else: - assert limit_op.all_inputs_done.call_count == 0, limit + assert limit_op.inputs_done.call_count == 0, limit assert not limit_op.completed(), limit assert not limit_op._limit_reached(), limit assert limit_op.need_more_inputs(), limit - limit_op.all_inputs_done() + limit_op.inputs_done() # After inputs done, the number of output bundles # should be the same as the number of `add_input`s. assert limit_op.num_outputs_total() == loop_count, limit @@ -650,81 +647,6 @@ def _get_bundles(bundle: RefBundle): return output -@pytest.mark.parametrize("preserve_order", (True, False)) -def test_union_operator(ray_start_regular_shared, preserve_order): - """Test basic functionalities of UnionOperator.""" - execution_options = ExecutionOptions(preserve_order=preserve_order) - ctx = ray.data.DataContext.get_current() - ctx.execution_options = execution_options - - num_rows_per_block = 3 - data0 = make_ref_bundles([[i] * num_rows_per_block for i in range(3)]) - data1 = make_ref_bundles([[i] * num_rows_per_block for i in range(2)]) - data2 = make_ref_bundles([[i] * num_rows_per_block for i in range(1)]) - - op0 = InputDataBuffer(data0) - op1 = InputDataBuffer(data1) - op2 = InputDataBuffer(data2) - union_op = UnionOperator(op0, op1, op2) - union_op.start(execution_options) - - assert not union_op.has_next() - union_op.add_input(op0.get_next(), 0) - assert union_op.has_next() - - assert union_op.get_next() == data0[0] - assert not union_op.has_next() - - union_op.add_input(op0.get_next(), 0) - union_op.add_input(op0.get_next(), 0) - assert union_op.get_next() == data0[1] - assert union_op.get_next() == data0[2] - - union_op.input_done(0) - assert not union_op.completed() - if preserve_order: - assert union_op._input_idx_to_output == 1 - - if preserve_order: - union_op.add_input(op1.get_next(), 1) - union_op.add_input(op2.get_next(), 2) - assert union_op._input_idx_to_output == 1 - - assert union_op.get_next() == data1[0] - assert not union_op.has_next() - - # Check the case where an input op which is not the op - # corresponding to _input_idx_to_output finishes first. - union_op.input_done(2) - assert union_op._input_idx_to_output == 1 - - union_op.add_input(op1.get_next(), 1) - assert union_op.has_next() - assert union_op.get_next() == data1[1] - assert not union_op.has_next() - # Marking the current output buffer source op will - # increment _input_idx_to_output to the next source. - union_op.input_done(1) - assert union_op._input_idx_to_output == 2 - assert union_op.has_next() - assert union_op.get_next() == data2[0] - else: - union_op.add_input(op1.get_next(), 1) - union_op.add_input(op2.get_next(), 2) - union_op.add_input(op1.get_next(), 1) - # The output will be in the same order as the inputs - # were added with `add_input()`. - assert union_op.get_next() == data1[0] - assert union_op.get_next() == data2[0] - assert union_op.get_next() == data1[1] - - assert all([len(b) == 0 for b in union_op._input_buffers]) - - _take_outputs(union_op) - union_op.all_inputs_done() - assert union_op.completed() - - @pytest.mark.parametrize( "target,in_bundles,expected_bundles", [ diff --git a/python/ray/data/tests/test_randomize_block_order.py b/python/ray/data/tests/test_randomize_block_order.py index 79048dd4e1f2..7f26a6c742c7 100644 --- a/python/ray/data/tests/test_randomize_block_order.py +++ b/python/ray/data/tests/test_randomize_block_order.py @@ -1,9 +1,7 @@ import pytest import ray -from ray.data._internal.execution.operators.base_physical_operator import ( - AllToAllOperator, -) +from ray.data._internal.execution.operators.all_to_all_operator import AllToAllOperator from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.logical.interfaces import LogicalPlan from ray.data._internal.logical.operators.all_to_all_operator import ( diff --git a/python/ray/data/tests/test_streaming_executor.py b/python/ray/data/tests/test_streaming_executor.py index 87224660e9d7..04b093ae0796 100644 --- a/python/ray/data/tests/test_streaming_executor.py +++ b/python/ray/data/tests/test_streaming_executor.py @@ -25,7 +25,6 @@ build_streaming_topology, process_completed_tasks, select_operator_to_run, - update_operator_states, ) from ray.data._internal.execution.util import make_ref_bundles from ray.data.tests.conftest import * # noqa @@ -92,7 +91,6 @@ def test_process_completed_tasks(): # Test processing output bundles. assert len(topo[o1].outqueue) == 0, topo process_completed_tasks(topo) - update_operator_states(topo) assert len(topo[o1].outqueue) == 20, topo # Test processing completed work items. @@ -100,32 +98,29 @@ def test_process_completed_tasks(): done_ref = ray.put("done") o2.get_work_refs = MagicMock(return_value=[sleep_ref, done_ref]) o2.notify_work_completed = MagicMock() - o2.all_inputs_done = MagicMock() + o2.inputs_done = MagicMock() o1.all_dependents_complete = MagicMock() process_completed_tasks(topo) - update_operator_states(topo) o2.notify_work_completed.assert_called_once_with(done_ref) - o2.all_inputs_done.assert_not_called() + o2.inputs_done.assert_not_called() o1.all_dependents_complete.assert_not_called() # Test input finalization. o2.get_work_refs = MagicMock(return_value=[done_ref]) o2.notify_work_completed = MagicMock() - o2.all_inputs_done = MagicMock() + o2.inputs_done = MagicMock() o1.all_dependents_complete = MagicMock() o1.completed = MagicMock(return_value=True) topo[o1].outqueue.clear() process_completed_tasks(topo) - update_operator_states(topo) o2.notify_work_completed.assert_called_once_with(done_ref) - o2.all_inputs_done.assert_called_once() + o2.inputs_done.assert_called_once() o1.all_dependents_complete.assert_not_called() # Test dependents completed. o2.need_more_inputs = MagicMock(return_value=False) o1.all_dependents_complete = MagicMock() process_completed_tasks(topo) - update_operator_states(topo) o1.all_dependents_complete.assert_called_once() diff --git a/python/ray/data/tests/test_streaming_integration.py b/python/ray/data/tests/test_streaming_integration.py index 16456e17941e..4846c5285b75 100644 --- a/python/ray/data/tests/test_streaming_integration.py +++ b/python/ray/data/tests/test_streaming_integration.py @@ -15,9 +15,7 @@ ExecutionResources, RefBundle, ) -from ray.data._internal.execution.operators.base_physical_operator import ( - AllToAllOperator, -) +from ray.data._internal.execution.operators.all_to_all_operator import AllToAllOperator from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.operators.output_splitter import OutputSplitter