From c192455bae27f1004d9be77d790515d90deba979 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Sat, 20 May 2023 13:28:14 -0700 Subject: [PATCH 01/10] patch all Signed-off-by: Hao Chen --- .../data/_internal/execution/interfaces.py | 6 +- .../data/_internal/execution/legacy_compat.py | 6 +- .../operators/actor_pool_map_operator.py | 3 + .../logical/operators/input_data_operator.py | 21 +++++ .../logical/operators/read_operator.py | 10 +-- .../logical/rules/operator_fusion.py | 77 +++++++++++----- python/ray/data/_internal/logical/util.py | 1 + .../pull_based_shuffle_task_scheduler.py | 4 +- .../push_based_shuffle_task_scheduler.py | 5 +- .../split_repartition_task_scheduler.py | 39 ++++---- python/ray/data/_internal/planner/map_rows.py | 6 +- .../_internal/planner/plan_all_to_all_op.py | 2 +- .../_internal/planner/plan_from_arrow_op.py | 2 +- .../_internal/planner/plan_from_items_op.py | 2 +- .../_internal/planner/plan_from_numpy_op.py | 2 +- .../_internal/planner/plan_from_pandas_op.py | 2 +- .../_internal/planner/plan_input_data_op.py | 11 +++ .../data/_internal/planner/plan_limit_op.py | 19 ++++ .../data/_internal/planner/plan_read_op.py | 40 ++++++--- python/ray/data/_internal/planner/planner.py | 10 +++ .../data/_internal/planner/random_shuffle.py | 8 +- python/ray/data/context.py | 3 +- python/ray/data/dataset.py | 20 +++++ python/ray/data/read_api.py | 4 +- python/ray/data/tests/test_all_to_all.py | 2 +- .../data/tests/test_arrow_serialization.py | 8 +- .../data/tests/test_execution_optimizer.py | 45 +++++----- python/ray/data/tests/test_formats.py | 2 + python/ray/data/tests/test_map.py | 3 +- python/ray/data/tests/test_optimize.py | 22 +++-- .../data/tests/test_randomize_block_order.py | 8 +- python/ray/data/tests/test_stats.py | 90 +++++++++++++------ .../test_streaming_backpressure_edge_case.py | 2 - python/ray/data/tests/util.py | 1 + 34 files changed, 342 insertions(+), 144 deletions(-) create mode 100644 python/ray/data/_internal/logical/operators/input_data_operator.py create mode 100644 python/ray/data/_internal/planner/plan_input_data_op.py create mode 100644 python/ray/data/_internal/planner/plan_limit_op.py diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index eee7cf1c46dd..8cda45982d8c 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -1,6 +1,6 @@ from dataclasses import dataclass, field import os -from typing import Dict, List, Optional, Iterable, Iterator, Tuple, Callable, Union +from typing import Any, Dict, List, Optional, Iterable, Iterator, Tuple, Callable, Union import ray from ray.util.annotations import DeveloperAPI @@ -237,6 +237,10 @@ class TaskContext: # an AllToAllOperator with an upstream MapOperator. upstream_map_transform_fn: Optional["MapTransformFn"] = None + # The Ray remote arguments of the fused upstream MapOperator. + # This should be set if upstream_map_transform_fn is set. + upstream_map_ray_remote_args: Dict[str, Any] = None + # Block transform function applied by task and actor pools in MapOperator. MapTransformFn = Callable[[Iterable[Block], TaskContext], Iterable[Block]] diff --git a/python/ray/data/_internal/execution/legacy_compat.py b/python/ray/data/_internal/execution/legacy_compat.py index 907c368a2ffa..e5ed8651454b 100644 --- a/python/ray/data/_internal/execution/legacy_compat.py +++ b/python/ray/data/_internal/execution/legacy_compat.py @@ -136,7 +136,11 @@ def _get_execution_dag( record_operators_usage(plan._logical_plan.dag) # Get DAG of physical operators and input statistics. - if DataContext.get_current().optimizer_enabled: + if ( + DataContext.get_current().optimizer_enabled + # TODO(hchen): Remove this when all operators support local plan. + and getattr(plan, "_logical_plan", None) is not None + ): dag = get_execution_plan(plan._logical_plan).dag stats = _get_initial_stats_from_plan(plan) else: diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index bc55b0d9503d..85ae4471edbd 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -94,6 +94,9 @@ def __init__( self._inputs_done = False self._next_task_idx = 0 + def get_init_fn(self) -> Callable[[], None]: + return self._init_fn + def internal_queue_size(self) -> int: return len(self._bundle_queue) diff --git a/python/ray/data/_internal/logical/operators/input_data_operator.py b/python/ray/data/_internal/logical/operators/input_data_operator.py new file mode 100644 index 000000000000..6d9021570ab1 --- /dev/null +++ b/python/ray/data/_internal/logical/operators/input_data_operator.py @@ -0,0 +1,21 @@ +from typing import List, Optional, Callable + +from ray.data._internal.execution.interfaces import RefBundle +from ray.data._internal.logical.interfaces import LogicalOperator + + +class InputData(LogicalOperator): + """Logical operator for input data. + + This may hold cached blocks from a previous Dataset execution, or + the arguments for read tasks. + """ + + def __init__( + self, + input_data: Optional[List[RefBundle]] = None, + input_data_factory: Callable[[], List[RefBundle]] = None, + ): + super().__init__("InputData", []) + self.input_data = input_data + self.input_data_factory = input_data_factory diff --git a/python/ray/data/_internal/logical/operators/read_operator.py b/python/ray/data/_internal/logical/operators/read_operator.py index 7d82fb7cc9b6..34f72763a4b1 100644 --- a/python/ray/data/_internal/logical/operators/read_operator.py +++ b/python/ray/data/_internal/logical/operators/read_operator.py @@ -1,7 +1,7 @@ -from typing import Any, Dict +from typing import List, Dict, Any from ray.data._internal.logical.operators.map_operator import AbstractMap -from ray.data.datasource.datasource import Datasource +from ray.data.datasource.datasource import ReadTask, Datasource class Read(AbstractMap): @@ -10,11 +10,9 @@ class Read(AbstractMap): def __init__( self, datasource: Datasource, - parallelism: int = -1, + read_tasks: List[ReadTask], ray_remote_args: Dict[str, Any] = None, - read_args: Dict[str, Any] = None, ): super().__init__("Read", None, ray_remote_args) self._datasource = datasource - self._parallelism = parallelism - self._read_args = read_args + self._read_tasks = read_tasks diff --git a/python/ray/data/_internal/logical/rules/operator_fusion.py b/python/ray/data/_internal/logical/rules/operator_fusion.py index 41b61538bb5a..266bd77f19b4 100644 --- a/python/ray/data/_internal/logical/rules/operator_fusion.py +++ b/python/ray/data/_internal/logical/rules/operator_fusion.py @@ -1,6 +1,13 @@ from typing import Iterator, List, Tuple + from ray.data._internal.logical.operators.all_to_all_operator import Repartition from ray.data._internal.execution.operators.map_operator import MapOperator +from ray.data._internal.execution.operators.actor_pool_map_operator import ( + ActorPoolMapOperator, +) +from ray.data._internal.execution.operators.task_pool_map_operator import ( + TaskPoolMapOperator, +) from ray.data._internal.logical.operators.all_to_all_operator import ( AbstractAllToAll, RandomShuffle, @@ -102,16 +109,22 @@ def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool: the same class AND constructor args are the same for both. * They have compatible remote arguments. """ - from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.logical.operators.map_operator import AbstractMap from ray.data._internal.logical.operators.map_operator import AbstractUDFMap # We currently only support fusing for the following cases: - # - MapOperator -> MapOperator - # - MapOperator -> AllToAllOperator + # - TaskPoolMapOperator -> TaskPoolMapOperator/ActorPoolMapOperator + # - TaskPoolMapOperator -> AllToAllOperator # (only RandomShuffle and Repartition LogicalOperators are currently supported) - if not isinstance(down_op, (MapOperator, AllToAllOperator)) or not isinstance( - up_op, MapOperator + if not ( + ( + isinstance(up_op, TaskPoolMapOperator) + and isinstance(down_op, (TaskPoolMapOperator, ActorPoolMapOperator)) + ) + or ( + isinstance(up_op, TaskPoolMapOperator) + and isinstance(down_op, AllToAllOperator) + ) ): return False @@ -195,7 +208,11 @@ def _get_fused_map_operator( up_logical_op = self._op_map.pop(up_op) # Merge target block sizes. - down_target_block_size = down_logical_op._target_block_size + down_target_block_size = ( + down_logical_op._target_block_size + if isinstance(down_logical_op, AbstractUDFMap) + else None + ) up_target_block_size = ( up_logical_op._target_block_size if isinstance(up_logical_op, AbstractUDFMap) @@ -219,10 +236,17 @@ def fused_map_transform_fn( # TODO(Scott): Add zero-copy batching between transform functions. return down_transform_fn(blocks, ctx) + # Fuse init funcitons. + fused_init_fn = ( + down_op.get_init_fn() if isinstance(down_op, ActorPoolMapOperator) else None + ) + # We take the downstream op's compute in case we're fusing upstream tasks with a # downstream actor pool (e.g. read->map). - compute = get_compute(down_logical_op._compute) - ray_remote_args = down_logical_op._ray_remote_args + compute = None + if isinstance(down_logical_op, AbstractUDFMap): + compute = get_compute(down_logical_op._compute) + ray_remote_args = up_logical_op._ray_remote_args # Make the upstream operator's inputs the new, fused operator's inputs. input_deps = up_op.input_dependencies assert len(input_deps) == 1 @@ -233,6 +257,7 @@ def fused_map_transform_fn( fused_map_transform_fn, input_op, name=name, + init_fn=fused_init_fn, compute_strategy=compute, min_rows_per_bundle=target_block_size, ray_remote_args=ray_remote_args, @@ -287,6 +312,7 @@ def _get_fused_all_to_all_operator( up_logical_op: AbstractUDFMap = self._op_map.pop(up_op) # Fuse transformation functions. + ray_remote_args = up_logical_op._ray_remote_args down_transform_fn = down_op.get_transformation_fn() up_transform_fn = up_op.get_transformation_fn() @@ -297,9 +323,9 @@ def fused_all_to_all_transform_fn( in the TaskContext so that it may be used by the downstream AllToAllOperator's transform function.""" ctx.upstream_map_transform_fn = up_transform_fn + ctx.upstream_map_ray_remote_args = ray_remote_args return down_transform_fn(blocks, ctx) - ray_remote_args = down_logical_op._ray_remote_args # Make the upstream operator's inputs the new, fused operator's inputs. input_deps = up_op.input_dependencies assert len(input_deps) == 1 @@ -330,18 +356,29 @@ def fused_all_to_all_transform_fn( return op -def _are_remote_args_compatible(up_args, down_args): +def _are_remote_args_compatible(prev_args, next_args): """Check if Ray remote arguments are compatible for merging.""" - from ray.data._internal.execution.operators.map_operator import ( - _canonicalize_ray_remote_args, - ) - - up_args = _canonicalize_ray_remote_args(up_args) - down_args = _canonicalize_ray_remote_args(down_args) - remote_args = down_args.copy() + prev_args = _canonicalize(prev_args) + next_args = _canonicalize(next_args) + remote_args = next_args.copy() for key in INHERITABLE_REMOTE_ARGS: - if key in up_args: - remote_args[key] = up_args[key] - if up_args != remote_args: + if key in prev_args: + remote_args[key] = prev_args[key] + if prev_args != remote_args: return False return True + + +def _canonicalize(remote_args: dict) -> dict: + """Returns canonical form of given remote args.""" + remote_args = remote_args.copy() + if "num_cpus" not in remote_args or remote_args["num_cpus"] is None: + remote_args["num_cpus"] = 1 + if "num_gpus" not in remote_args or remote_args["num_gpus"] is None: + remote_args["num_gpus"] = 0 + resources = remote_args.get("resources", {}) + for k, v in list(resources.items()): + if v is None or v == 0.0: + del resources[k] + remote_args["resources"] = resources + return remote_args diff --git a/python/ray/data/_internal/logical/util.py b/python/ray/data/_internal/logical/util.py index cc7e0dc40cdb..57460c39f8e3 100644 --- a/python/ray/data/_internal/logical/util.py +++ b/python/ray/data/_internal/logical/util.py @@ -56,6 +56,7 @@ "Aggregate", # N-ary "Zip", + "Union", ] diff --git a/python/ray/data/_internal/planner/exchange/pull_based_shuffle_task_scheduler.py b/python/ray/data/_internal/planner/exchange/pull_based_shuffle_task_scheduler.py index fcfeaeb478fb..9d12bbaf2b65 100644 --- a/python/ray/data/_internal/planner/exchange/pull_based_shuffle_task_scheduler.py +++ b/python/ray/data/_internal/planner/exchange/pull_based_shuffle_task_scheduler.py @@ -75,7 +75,9 @@ def execute( for j in range(output_num_blocks) ] - new_blocks, new_metadata = zip(*shuffle_reduce_out) + new_blocks, new_metadata = [], [] + if shuffle_reduce_out: + new_blocks, new_metadata = zip(*shuffle_reduce_out) new_metadata = reduce_bar.fetch_until_complete(list(new_metadata)) reduce_bar.close() diff --git a/python/ray/data/_internal/planner/exchange/push_based_shuffle_task_scheduler.py b/python/ray/data/_internal/planner/exchange/push_based_shuffle_task_scheduler.py index adf17c89d77c..17e375e1af07 100644 --- a/python/ray/data/_internal/planner/exchange/push_based_shuffle_task_scheduler.py +++ b/python/ray/data/_internal/planner/exchange/push_based_shuffle_task_scheduler.py @@ -503,7 +503,10 @@ def merge(*args, **kwargs): for i, block in enumerate(new_blocks) ] sorted_blocks.sort(key=lambda x: x[0]) - _, new_blocks, reduce_stage_metadata = zip(*sorted_blocks) + + new_blocks, reduce_stage_metadata = [], [] + if sorted_blocks: + _, new_blocks, reduce_stage_metadata = zip(*sorted_blocks) del sorted_blocks assert ( diff --git a/python/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py b/python/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py index f0a2075e740d..b12e6b15b96c 100644 --- a/python/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py +++ b/python/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py @@ -39,14 +39,19 @@ def execute( if not ref_bundle.owns_blocks: input_owned_by_consumer = False - # Compute the (output_num_blocks-1) indices needed for - # an equal split of the input blocks. + # Compute the (output_num_blocks) indices needed for an equal split of the + # input blocks. When output_num_blocks=1, the total number of + # input rows is used as the end index during the split calculation, + # so that we can combine all input blocks into a single output block. indices = [] - cur_idx = 0 - for _ in range(output_num_blocks - 1): - cur_idx += input_num_rows / output_num_blocks - indices.append(int(cur_idx)) - assert len(indices) < output_num_blocks, (indices, output_num_blocks) + if output_num_blocks == 1: + indices = [input_num_rows] + else: + cur_idx = 0 + for _ in range(output_num_blocks - 1): + cur_idx += input_num_rows / output_num_blocks + indices.append(int(cur_idx)) + assert len(indices) <= output_num_blocks, (indices, output_num_blocks) if map_ray_remote_args is None: map_ray_remote_args = {} @@ -59,19 +64,13 @@ def execute( blocks_with_metadata: List[Tuple[ObjectRef[Block], BlockMetadata]] = [] for ref_bundle in refs: blocks_with_metadata.extend(ref_bundle.blocks) - if indices: - split_return = _split_at_indices( - blocks_with_metadata, indices, input_owned_by_consumer - ) - split_block_refs, split_metadata = [], [] - for b, m in zip(*split_return): - split_block_refs.append(b) - split_metadata.extend(m) - else: - split_block_refs, split_metadata = [], [] - for b, m in blocks_with_metadata: - split_block_refs.append([b]) - split_metadata.append(m) + split_return = _split_at_indices( + blocks_with_metadata, indices, input_owned_by_consumer + ) + split_block_refs, split_metadata = [], [] + for b, m in zip(*split_return): + split_block_refs.append(b) + split_metadata.extend(m) reduce_bar = ProgressBar("Split Repartition", total=output_num_blocks) reduce_task = cached_remote_fn(self._exchange_spec.reduce) diff --git a/python/ray/data/_internal/planner/map_rows.py b/python/ray/data/_internal/planner/map_rows.py index 99405ff5ebf1..f9971efc51d8 100644 --- a/python/ray/data/_internal/planner/map_rows.py +++ b/python/ray/data/_internal/planner/map_rows.py @@ -8,9 +8,9 @@ from ray.data.context import DataContext -def generate_map_rows_fn() -> Callable[ - [Iterator[Block], TaskContext, UserDefinedFunction], Iterator[Block] -]: +def generate_map_rows_fn() -> ( + Callable[[Iterator[Block], TaskContext, UserDefinedFunction], Iterator[Block]] +): """Generate function to apply the UDF to each record of blocks.""" context = DataContext.get_current() diff --git a/python/ray/data/_internal/planner/plan_all_to_all_op.py b/python/ray/data/_internal/planner/plan_all_to_all_op.py index 985a5a88c403..1cef9ea99848 100644 --- a/python/ray/data/_internal/planner/plan_all_to_all_op.py +++ b/python/ray/data/_internal/planner/plan_all_to_all_op.py @@ -27,7 +27,7 @@ def _plan_all_to_all_op( if isinstance(op, RandomizeBlocks): fn = generate_randomize_blocks_fn(op._seed) elif isinstance(op, RandomShuffle): - fn = generate_random_shuffle_fn(op._seed, op._num_outputs) + fn = generate_random_shuffle_fn(op._seed, op._num_outputs, op._ray_remote_args) elif isinstance(op, Repartition): fn = generate_repartition_fn(op._num_outputs, op._shuffle) elif isinstance(op, Sort): diff --git a/python/ray/data/_internal/planner/plan_from_arrow_op.py b/python/ray/data/_internal/planner/plan_from_arrow_op.py index f84283e4f746..828a71ca37c9 100644 --- a/python/ray/data/_internal/planner/plan_from_arrow_op.py +++ b/python/ray/data/_internal/planner/plan_from_arrow_op.py @@ -22,7 +22,7 @@ def get_input_data() -> List[RefBundle]: get_metadata = cached_remote_fn(get_table_block_metadata) metadata = ray.get([get_metadata.remote(t) for t in op._tables]) ref_bundles: List[RefBundle] = [ - RefBundle([(table_ref, block_metadata)], owns_blocks=True) + RefBundle([(table_ref, block_metadata)], owns_blocks=False) for table_ref, block_metadata in zip(op._tables, metadata) ] return ref_bundles diff --git a/python/ray/data/_internal/planner/plan_from_items_op.py b/python/ray/data/_internal/planner/plan_from_items_op.py index 95507501bc02..35194948ee4c 100644 --- a/python/ray/data/_internal/planner/plan_from_items_op.py +++ b/python/ray/data/_internal/planner/plan_from_items_op.py @@ -48,7 +48,7 @@ def get_input_data() -> List[RefBundle]: ) block_ref_bundle = RefBundle( [(ray.put(block), block_metadata)], - owns_blocks=True, + owns_blocks=False, ) ref_bundles.append(block_ref_bundle) return ref_bundles diff --git a/python/ray/data/_internal/planner/plan_from_numpy_op.py b/python/ray/data/_internal/planner/plan_from_numpy_op.py index 969b4c26478b..ec172dcee62c 100644 --- a/python/ray/data/_internal/planner/plan_from_numpy_op.py +++ b/python/ray/data/_internal/planner/plan_from_numpy_op.py @@ -27,7 +27,7 @@ def get_input_data() -> List[RefBundle]: blocks, metadata = map(list, zip(*res)) metadata = ray.get(metadata) ref_bundles: List[RefBundle] = [ - RefBundle([(block, block_metadata)], owns_blocks=True) + RefBundle([(block, block_metadata)], owns_blocks=False) for block, block_metadata in zip(blocks, metadata) ] return ref_bundles diff --git a/python/ray/data/_internal/planner/plan_from_pandas_op.py b/python/ray/data/_internal/planner/plan_from_pandas_op.py index 096d1343c10d..6edaad5ecb92 100644 --- a/python/ray/data/_internal/planner/plan_from_pandas_op.py +++ b/python/ray/data/_internal/planner/plan_from_pandas_op.py @@ -64,7 +64,7 @@ def get_input_data() -> List[RefBundle]: get_table_block_metadata, ) - owns_blocks = True + owns_blocks = False if isinstance(op, FromDask): _init_data_from_dask(op) elif isinstance(op, FromModin): diff --git a/python/ray/data/_internal/planner/plan_input_data_op.py b/python/ray/data/_internal/planner/plan_input_data_op.py new file mode 100644 index 000000000000..e22971c1aec3 --- /dev/null +++ b/python/ray/data/_internal/planner/plan_input_data_op.py @@ -0,0 +1,11 @@ +from ray.data._internal.execution.interfaces import PhysicalOperator +from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer +from ray.data._internal.logical.operators.input_data_operator import InputData + + +def _plan_input_data_op(op: InputData) -> PhysicalOperator: + """Get the corresponding DAG of physical operators for InputData.""" + + return InputDataBuffer( + input_data=op.input_data, input_data_factory=op.input_data_factory + ) diff --git a/python/ray/data/_internal/planner/plan_limit_op.py b/python/ray/data/_internal/planner/plan_limit_op.py new file mode 100644 index 000000000000..e341d63f10ee --- /dev/null +++ b/python/ray/data/_internal/planner/plan_limit_op.py @@ -0,0 +1,19 @@ +from typing import TYPE_CHECKING +from ray.data._internal.execution.operators.limit_operator import LimitOperator + + +if TYPE_CHECKING: + from ray.data._internal.logical.operators.limit_operator import Limit + from ray.data._internal.execution.interfaces import PhysicalOperator + + +def _plan_limit_op( + op: "Limit", input_physical_dag: "PhysicalOperator" +) -> "PhysicalOperator": + """Get the corresponding DAG of physical operators for Limit. + + Note this method only converts the given `op`, but not its input dependencies. + See Planner.plan() for more details. + """ + + return LimitOperator(op._limit, input_physical_dag) diff --git a/python/ray/data/_internal/planner/plan_read_op.py b/python/ray/data/_internal/planner/plan_read_op.py index 56bc459fab56..be5d6400e7b8 100644 --- a/python/ray/data/_internal/planner/plan_read_op.py +++ b/python/ray/data/_internal/planner/plan_read_op.py @@ -10,9 +10,29 @@ from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.logical.operators.read_operator import Read -from ray.data.block import Block, BlockMetadata +from ray.data.block import Block from ray.data.datasource.datasource import ReadTask +TASK_SIZE_WARN_THRESHOLD_BYTES = 100000 + + +# Defensively compute the size of the block as the max size reported by the +# datasource and the actual read task size. This is to guard against issues +# with bad metadata reporting. +def cleaned_metadata(read_task): + block_meta = read_task.get_metadata() + task_size = len(cloudpickle.dumps(read_task)) + if block_meta.size_bytes is None or task_size > block_meta.size_bytes: + if task_size > TASK_SIZE_WARN_THRESHOLD_BYTES: + print( + f"WARNING: the read task size ({task_size} bytes) is larger " + "than the reported output size of the task " + f"({block_meta.size_bytes} bytes). This may be a size " + "reporting bug in the datasource being read from." + ) + block_meta.size_bytes = task_size + return block_meta + def _plan_read_op(op: Read) -> PhysicalOperator: """Get the corresponding DAG of physical operators for Read. @@ -22,8 +42,7 @@ def _plan_read_op(op: Read) -> PhysicalOperator: """ def get_input_data() -> List[RefBundle]: - reader = op._datasource.create_reader(**op._read_args) - read_tasks = reader.get_read_tasks(op._parallelism) + read_tasks = op._read_tasks return [ RefBundle( [ @@ -31,13 +50,7 @@ def get_input_data() -> List[RefBundle]: # TODO(chengsu): figure out a better way to pass read # tasks other than ray.put(). ray.put(read_task), - BlockMetadata( - num_rows=1, - size_bytes=len(cloudpickle.dumps(read_task)), - schema=None, - input_files=[], - exec_stats=None, - ), + cleaned_metadata(read_task), ) ], owns_blocks=True, @@ -51,4 +64,9 @@ def do_read(blocks: Iterator[ReadTask], ctx: TaskContext) -> Iterator[Block]: for read_task in blocks: yield from read_task() - return MapOperator.create(do_read, inputs, name="DoRead") + return MapOperator.create( + do_read, + inputs, + name="DoRead", + ray_remote_args=op._ray_remote_args, + ) diff --git a/python/ray/data/_internal/planner/planner.py b/python/ray/data/_internal/planner/planner.py index 564761e35c74..01a90679be62 100644 --- a/python/ray/data/_internal/planner/planner.py +++ b/python/ray/data/_internal/planner/planner.py @@ -13,8 +13,10 @@ from ray.data._internal.logical.operators.from_items_operator import FromItems from ray.data._internal.logical.operators.from_numpy_operator import FromNumpyRefs from ray.data._internal.logical.operators.read_operator import Read +from ray.data._internal.logical.operators.input_data_operator import InputData from ray.data._internal.logical.operators.write_operator import Write from ray.data._internal.logical.operators.map_operator import AbstractUDFMap +from ray.data._internal.logical.operators.limit_operator import Limit from ray.data._internal.planner.plan_all_to_all_op import _plan_all_to_all_op from ray.data._internal.planner.plan_from_arrow_op import _plan_from_arrow_refs_op from ray.data._internal.planner.plan_from_items_op import _plan_from_items_op @@ -25,7 +27,9 @@ ) from ray.data._internal.planner.plan_udf_map_op import _plan_udf_map_op from ray.data._internal.planner.plan_read_op import _plan_read_op +from ray.data._internal.planner.plan_input_data_op import _plan_input_data_op from ray.data._internal.planner.plan_write_op import _plan_write_op +from ray.data._internal.planner.plan_limit_op import _plan_limit_op class Planner: @@ -52,6 +56,9 @@ def _plan(self, logical_op: LogicalOperator) -> PhysicalOperator: if isinstance(logical_op, Read): assert not physical_children physical_op = _plan_read_op(logical_op) + elif isinstance(logical_op, InputData): + assert not physical_children + physical_op = _plan_input_data_op(logical_op) elif isinstance(logical_op, Write): assert len(physical_children) == 1 physical_op = _plan_write_op(logical_op, physical_children[0]) @@ -78,6 +85,9 @@ def _plan(self, logical_op: LogicalOperator) -> PhysicalOperator: elif isinstance(logical_op, Zip): assert len(physical_children) == 2 physical_op = ZipOperator(physical_children[0], physical_children[1]) + elif isinstance(logical_op, Limit): + assert len(physical_children) == 1 + physical_op = _plan_limit_op(logical_op, physical_children[0]) else: raise ValueError( f"Found unknown logical operator during planning: {logical_op}" diff --git a/python/ray/data/_internal/planner/random_shuffle.py b/python/ray/data/_internal/planner/random_shuffle.py index 5827c34802d4..3ebb420c1932 100644 --- a/python/ray/data/_internal/planner/random_shuffle.py +++ b/python/ray/data/_internal/planner/random_shuffle.py @@ -35,8 +35,12 @@ def fn( # is applied to each block before shuffling. map_transform_fn: Optional[MapTransformFn] = ctx.upstream_map_transform_fn upstream_map_fn = None + _ray_remote_args = ray_remote_args if map_transform_fn: upstream_map_fn = lambda block: map_transform_fn(block, ctx) # noqa: E731 + # If there is a fused upstream operator, + # also use the ray_remote_args from the fused upstream operator. + _ray_remote_args = ctx.upstream_map_ray_remote_args shuffle_spec = ShuffleTaskSpec( random_shuffle=True, @@ -56,8 +60,8 @@ def fn( return scheduler.execute( refs, num_outputs or num_input_blocks, - map_ray_remote_args=ray_remote_args, - reduce_ray_remote_args=ray_remote_args, + map_ray_remote_args=_ray_remote_args, + reduce_ray_remote_args=_ray_remote_args, ) return fn diff --git a/python/ray/data/context.py b/python/ray/data/context.py index 519a1c3148e2..88b055aaa19f 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -99,7 +99,7 @@ # Whether to enable optimizer. DEFAULT_OPTIMIZER_ENABLED = bool( - int(os.environ.get("RAY_DATA_NEW_EXECUTION_OPTIMIZER", "0")) + int(os.environ.get("RAY_DATA_NEW_EXECUTION_OPTIMIZER", "1")) ) # Set this env var to enable distributed tqdm (experimental). @@ -214,7 +214,6 @@ def get_current() -> "DataContext": global _default_context with _context_lock: - if _default_context is None: _default_context = DataContext( block_splitting_enabled=DEFAULT_BLOCK_SPLITTING_ENABLED, diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 7596f7734ded..798442338b85 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -36,6 +36,7 @@ Repartition, Sort, ) +from ray.data._internal.logical.operators.input_data_operator import InputData from ray.data._internal.logical.operators.n_ary_operator import Zip from ray.data._internal.logical.optimizers import LogicalPlan from ray.data._internal.logical.operators.limit_operator import Limit @@ -3976,6 +3977,25 @@ def materialize(self) -> "MaterializedDataset": """ copy = Dataset.copy(self, _deep_copy=True, _as=MaterializedDataset) copy._plan.execute(force_read=True) + + blocks = copy._plan._snapshot_blocks + + def get_input_data(): + from ray.data._internal.execution.interfaces import RefBundle + + if not blocks: + return [] + else: + return [ + RefBundle( + blocks=[block_with_metadata], + owns_blocks=False, + ) for block_with_metadata in blocks.get_blocks_with_metadata() + ] + + # Create a new logical plan whose input is the existing data from the the old Dataset. + copy._logical_plan = LogicalPlan(InputData(input_data_factory=get_input_data)) + return copy @ConsumptionAPI(pattern="timing information.", insert_after=True) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 3cf4649c61fe..1e0cfe6cc98a 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -436,9 +436,7 @@ def read_datasource( owned_by_consumer=False, ) - # TODO(chengsu): avoid calling Reader.get_read_tasks() twice after removing - # LazyBlockList code path. - read_op = Read(datasource, requested_parallelism, ray_remote_args, read_args) + read_op = Read(datasource, read_tasks, ray_remote_args) logical_plan = LogicalPlan(read_op) return Dataset( diff --git a/python/ray/data/tests/test_all_to_all.py b/python/ray/data/tests/test_all_to_all.py index bda23abdd1e4..2d0573baab33 100644 --- a/python/ray/data/tests/test_all_to_all.py +++ b/python/ray/data/tests/test_all_to_all.py @@ -1730,7 +1730,7 @@ def test_random_shuffle_check_random(shutdown_only): prev = x -def test_random_shuffle_with_custom_resource(ray_start_cluster): +def test_random_shuffle_with_custom_resource(ray_start_cluster, use_push_based_shuffle): cluster = ray_start_cluster # Create two nodes which have different custom resources. cluster.add_node( diff --git a/python/ray/data/tests/test_arrow_serialization.py b/python/ray/data/tests/test_arrow_serialization.py index 11b9b53dd9ff..7d7ee41d5479 100644 --- a/python/ray/data/tests/test_arrow_serialization.py +++ b/python/ray/data/tests/test_arrow_serialization.py @@ -61,8 +61,8 @@ def test_align_bit_offset_auto(): ) -@mock.patch("ray.data._internal.arrow_serialization._copy_normal_buffer_if_needed") -@mock.patch("ray.data._internal.arrow_serialization._copy_bitpacked_buffer_if_needed") +@mock.patch("ray._private.arrow_serialization._copy_normal_buffer_if_needed") +@mock.patch("ray._private.arrow_serialization._copy_bitpacked_buffer_if_needed") def test_copy_buffer_if_needed(mock_bitpacked, mock_normal): # Test that type-based buffer copy dispatch works as expected. bytes_ = b"abcd" @@ -519,11 +519,11 @@ def test_arrow_scalar_conversion(ray_start_regular_shared): ds = ray.data.from_items([1]) def fn(batch: list): - return np.array([1]) + return {"id": np.array([1])} ds = ds.map_batches(fn) res = ds.take() - assert res == [1], res + assert res == [{"id": 1}], res def test_custom_arrow_data_serializer_parquet_roundtrip( diff --git a/python/ray/data/tests/test_execution_optimizer.py b/python/ray/data/tests/test_execution_optimizer.py index c0e575b877e5..4b6f39c45ee5 100644 --- a/python/ray/data/tests/test_execution_optimizer.py +++ b/python/ray/data/tests/test_execution_optimizer.py @@ -73,7 +73,7 @@ def _check_usage_record(op_names: List[str], clear_after_check: Optional[bool] = def test_read_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - op = Read(ParquetDatasource()) + op = Read(ParquetDatasource(), []) plan = LogicalPlan(op) physical_op = planner.plan(plan).dag @@ -107,7 +107,7 @@ def test_from_items_e2e(ray_start_regular_shared, enable_optimizer): def test_map_batches_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) op = MapBatches( read_op, lambda x: x, @@ -130,7 +130,7 @@ def test_map_batches_e2e(ray_start_regular_shared, enable_optimizer): def test_map_rows_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) op = MapRows( read_op, lambda x: x, @@ -153,7 +153,7 @@ def test_map_rows_e2e(ray_start_regular_shared, enable_optimizer): def test_filter_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) op = Filter( read_op, lambda x: x, @@ -176,7 +176,7 @@ def test_filter_e2e(ray_start_regular_shared, enable_optimizer): def test_flat_map(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) op = FlatMap( read_op, lambda x: x, @@ -235,7 +235,7 @@ def ensure_sample_size_close(dataset, sample_percent=0.5): def test_random_shuffle_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) op = RandomShuffle( read_op, seed=0, @@ -267,7 +267,7 @@ def test_random_shuffle_e2e( ) def test_repartition_operator(ray_start_regular_shared, enable_optimizer, shuffle): planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) op = Repartition(read_op, num_outputs=5, shuffle=shuffle) plan = LogicalPlan(op) physical_op = planner.plan(plan).dag @@ -333,7 +333,7 @@ def _check_repartition_usage_and_stats(ds): def test_read_map_batches_operator_fusion(ray_start_regular_shared, enable_optimizer): # Test that Read is fused with MapBatches. planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) op = MapBatches( read_op, lambda x: x, @@ -353,7 +353,7 @@ def test_read_map_batches_operator_fusion(ray_start_regular_shared, enable_optim def test_read_map_chain_operator_fusion(ray_start_regular_shared, enable_optimizer): # Test that a chain of different map operators are fused. planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) op = MapRows(read_op, lambda x: x) op = MapBatches(op, lambda x: x) op = FlatMap(op, lambda x: x) @@ -377,6 +377,7 @@ def test_read_map_batches_operator_fusion_compatible_remote_args( planner = Planner() read_op = Read( ParquetDatasource(), + [], ray_remote_args={"num_cpus": 1, "scheduling_strategy": "SPREAD"}, ) op = MapBatches(read_op, lambda x: x, ray_remote_args={"num_cpus": 1}) @@ -398,7 +399,7 @@ def test_read_map_batches_operator_fusion_incompatible_remote_args( ): # Test that map operators are not fused when remote args are incompatible. planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) op = MapBatches(read_op, lambda x: x, ray_remote_args={"num_cpus": 2}) op = MapBatches(op, lambda x: x, ray_remote_args={"num_cpus": 3}) logical_plan = LogicalPlan(op) @@ -423,7 +424,7 @@ def test_read_map_batches_operator_fusion_compute_tasks_to_actors( # Test that a task-based map operator is fused into an actor-based map operator when # the former comes before the latter. planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) op = MapBatches(read_op, lambda x: x) op = MapBatches(op, lambda x: x, compute=ray.data.ActorPoolStrategy()) logical_plan = LogicalPlan(op) @@ -443,7 +444,7 @@ def test_read_map_batches_operator_fusion_compute_read_to_actors( ): # Test that reads fuse into an actor-based map operator. planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) op = MapBatches(read_op, lambda x: x, compute=ray.data.ActorPoolStrategy()) logical_plan = LogicalPlan(op) physical_plan = planner.plan(logical_plan) @@ -462,7 +463,7 @@ def test_read_map_batches_operator_fusion_incompatible_compute( ): # Test that map operators are not fused when compute strategies are incompatible. planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) op = MapBatches(read_op, lambda x: x, compute=ray.data.ActorPoolStrategy()) op = MapBatches(op, lambda x: x) logical_plan = LogicalPlan(op) @@ -486,7 +487,7 @@ def test_read_map_batches_operator_fusion_target_block_size( # Test that fusion of map operators merges their block sizes in the expected way # (taking the max). planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) op = MapBatches(read_op, lambda x: x, target_block_size=2) op = MapBatches(op, lambda x: x, target_block_size=5) op = MapBatches(op, lambda x: x, target_block_size=3) @@ -510,7 +511,7 @@ def test_read_map_batches_operator_fusion_callable_classes( ): # Test that callable classes can still be fused if they're the same function. planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) class UDF: def __call__(self, x): @@ -535,7 +536,7 @@ def test_read_map_batches_operator_fusion_incompatible_callable_classes( ): # Test that map operators are not fused when different callable classes are used. planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) class UDF: def __call__(self, x): @@ -568,7 +569,7 @@ def test_read_map_batches_operator_fusion_incompatible_constructor_args( # Test that map operators are not fused when callable classes have different # constructor args. planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) class UDF: def __init__(self, a): @@ -786,7 +787,7 @@ def test_write_fusion(ray_start_regular_shared, enable_optimizer, tmp_path): def test_write_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() datasource = ParquetDatasource() - read_op = Read(datasource) + read_op = Read(datasource, []) op = Write( read_op, datasource, @@ -802,7 +803,7 @@ def test_write_operator(ray_start_regular_shared, enable_optimizer): def test_sort_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) op = Sort( read_op, key="col1", @@ -882,7 +883,7 @@ def test_sort_validate_keys( def test_aggregate_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource()) + read_op = Read(ParquetDatasource(), []) op = Aggregate( read_op, key="col1", @@ -952,8 +953,8 @@ def test_aggregate_validate_keys( def test_zip_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op1 = Read(ParquetDatasource()) - read_op2 = Read(ParquetDatasource()) + read_op1 = Read(ParquetDatasource(), []) + read_op2 = Read(ParquetDatasource(), []) op = Zip(read_op1, read_op2) plan = LogicalPlan(op) physical_op = planner.plan(plan).dag diff --git a/python/ray/data/tests/test_formats.py b/python/ray/data/tests/test_formats.py index 5790eff6a0a4..2925fa7b2d05 100644 --- a/python/ray/data/tests/test_formats.py +++ b/python/ray/data/tests/test_formats.py @@ -320,6 +320,8 @@ def test_read_s3_file_error(shutdown_only, s3_path): def test_get_read_tasks(shutdown_only): + # Note: if you get TimeoutErrors here, try installing required dependencies + # with `pip install -U "ray[default]"`. ray.init() head_node_id = ray.get_runtime_context().get_node_id() diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index 4a7c2ca9cc0e..6b61a924d2e6 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -374,7 +374,7 @@ def test_map_batches_basic(ray_start_regular_shared, tmp_path, restore_data_cont def test_map_batches_extra_args(shutdown_only, tmp_path): ray.shutdown() - ray.init(num_cpus=2) + ray.init(num_cpus=3) def put(x): # We only support automatic deref in the legacy backend. @@ -875,6 +875,7 @@ def ensure_sample_size_close(dataset, sample_percent=0.5): ds2 = ray.data.range(2, parallelism=1) ds3 = ray.data.range(3, parallelism=1) # noinspection PyTypeChecker + # needs Union operator implemented to pass w/ optimizer ds = ds1.union(ds2).union(ds3) ensure_sample_size_close(ds) # Small datasets diff --git a/python/ray/data/tests/test_optimize.py b/python/ray/data/tests/test_optimize.py index dd8643d433d9..9af8df705926 100644 --- a/python/ray/data/tests/test_optimize.py +++ b/python/ray/data/tests/test_optimize.py @@ -14,7 +14,7 @@ from ray.data.context import DataContext from ray.data.datasource import Datasource, ReadTask from ray.data.datasource.csv_datasource import CSVDatasource -from ray.data.tests.util import column_udf, extract_values +from ray.data.tests.util import OPTIMIZER_ENABLED, column_udf, extract_values from ray.tests.conftest import * # noqa @@ -295,6 +295,7 @@ def _assert_has_stages(stages, stage_names): assert stage.name == name +@pytest.mark.skipif(OPTIMIZER_ENABLED, reason="Deprecated with new optimizer path.") def test_stage_linking(ray_start_regular_shared): # Test lazy dataset. ds = ray.data.range(10).lazy() @@ -308,10 +309,15 @@ def test_stage_linking(ray_start_regular_shared): ds = ds.materialize() _assert_has_stages(ds._plan._stages_before_snapshot, ["Map"]) assert len(ds._plan._stages_after_snapshot) == 0 + # since the new optimizer path does not use the stage fusion path, + # but instead performs operator fusion, we expect _last_optimized_stages + # to be not updated (None)? _assert_has_stages(ds._plan._last_optimized_stages, ["ReadRange->Map"]) def test_optimize_reorder(ray_start_regular_shared): + # The ReorderRandomizeBlocksRule optimizer rule collapses RandomizeBlocks operators, + # so we should not be fusing them to begin with. context = DataContext.get_current() context.optimize_fuse_stages = True context.optimize_fuse_read_stages = True @@ -321,7 +327,7 @@ def test_optimize_reorder(ray_start_regular_shared): expect_stages( ds, 2, - ["ReadRange->MapBatches(dummy_map)", "RandomizeBlockOrder"], + ["DoRead->MapBatches"], ) ds2 = ( @@ -334,7 +340,7 @@ def test_optimize_reorder(ray_start_regular_shared): expect_stages( ds2, 3, - ["ReadRange->RandomizeBlockOrder", "Repartition", "MapBatches(dummy_map)"], + ["DoRead", "Repartition", "MapBatches"], ) @@ -360,7 +366,7 @@ def test_write_fusion(ray_start_regular_shared, tmp_path): ds = ray.data.range(100).map_batches(lambda x: x) ds.write_csv(path) stats = ds._write_ds.stats() - assert "ReadRange->MapBatches()->Write" in stats, stats + assert "DoRead->MapBatches->Write" in stats, stats ds = ( ray.data.range(100) @@ -370,9 +376,9 @@ def test_write_fusion(ray_start_regular_shared, tmp_path): ) ds.write_csv(path) stats = ds._write_ds.stats() - assert "ReadRange->MapBatches()" in stats, stats + assert "DoRead->MapBatches" in stats, stats assert "RandomShuffle" in stats, stats - assert "MapBatches()->Write" in stats, stats + assert "MapBatches->Write" in stats, stats def test_write_doesnt_reorder_randomize_block(ray_start_regular_shared, tmp_path): @@ -383,9 +389,7 @@ def test_write_doesnt_reorder_randomize_block(ray_start_regular_shared, tmp_path # The randomize_block_order will switch order with the following map_batches, # but not the tailing write operator. - assert "ReadRange->MapBatches()" in stats, stats - assert "RandomizeBlockOrder" in stats, stats - assert "Write" in stats, stats + assert "DoRead->MapBatches->Write" in stats, stats def test_optimize_fuse(ray_start_regular_shared): diff --git a/python/ray/data/tests/test_randomize_block_order.py b/python/ray/data/tests/test_randomize_block_order.py index bccef7aaf331..3c744fc0c7c5 100644 --- a/python/ray/data/tests/test_randomize_block_order.py +++ b/python/ray/data/tests/test_randomize_block_order.py @@ -19,7 +19,7 @@ def test_randomize_blocks_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(datasource=None) + read_op = Read(datasource=None, []) op = RandomizeBlocks( read_op, seed=0, @@ -34,7 +34,7 @@ def test_randomize_blocks_operator(ray_start_regular_shared, enable_optimizer): def test_randomize_block_order_rule(): - read = Read(datasource=None) + read = Read(datasource=None, []) operator1 = RandomizeBlocks(input_op=read, seed=None) operator2 = RandomizeBlocks(input_op=operator1, seed=None) operator3 = MapBatches(input_op=operator2, fn=lambda x: x) @@ -57,7 +57,7 @@ def test_randomize_block_order_rule(): def test_randomize_block_order_rule_seed(): - read = Read(datasource=None) + read = Read(datasource=None, []) operator1 = RandomizeBlocks(input_op=read, seed=None) operator2 = RandomizeBlocks(input_op=operator1, seed=2) operator3 = MapBatches(input_op=operator2, fn=lambda x: x) @@ -84,7 +84,7 @@ def test_randomize_block_order_rule_seed(): def test_randomize_block_order_after_repartition(): - read = Read(datasource=None) + read = Read(datasource=None, []) operator1 = RandomizeBlocks(input_op=read) operator2 = Repartition(input_op=operator1, num_outputs=1, shuffle=False) operator3 = RandomizeBlocks(input_op=operator2) diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index c148a5bd545d..a63bbcd4e51c 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -65,7 +65,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): if context.new_execution_backend: assert ( canonicalize(logger_args[0]) - == """Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T + == """Stage N DoRead->MapBatches: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -79,7 +79,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): else: assert ( canonicalize(logger_args[0]) - == """Stage N Read->MapBatches(dummy_map_batches): N/N blocks executed in T + == """Stage N DoRead->MapBatches: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -96,7 +96,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): if context.new_execution_backend: assert ( canonicalize(logger_args[0]) - == """Stage N Map: N/N blocks executed in T + == """Stage N DoRead->MapBatches->MapRows: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -110,7 +110,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): else: assert ( canonicalize(logger_args[0]) - == """Stage N Map: N/N blocks executed in T + == """Stage N DoRead->MapBatches->MapRows: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -127,7 +127,11 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): if context.use_streaming_executor: assert ( stats - == """Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T + == """Stage Z Read: N/N blocks split from parent in T +* Output num rows: N min, N max, N mean, N total +* Output size bytes: N min, N max, N mean, N total + +Stage N DoRead->MapBatches: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -137,7 +141,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): * Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, \ 'obj_store_mem_peak': N} -Stage N Map: N/N blocks executed in T +Stage N DoRead->MapBatches->MapRows: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -163,7 +167,11 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): else: assert ( stats - == """Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T + == """Stage Z Read: N/N blocks split from parent in T +* Output num rows: N min, N max, N mean, N total +* Output size bytes: N min, N max, N mean, N total + +Stage N DoRead->MapBatches: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -173,7 +181,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): * Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, \ 'obj_store_mem_peak': N} -Stage N Map: N/N blocks executed in T +Stage N DoRead->MapBatches->MapRows: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -308,7 +316,7 @@ def test_dataset__repr__(ray_start_regular_shared): assert canonicalize(repr(ds2._plan.stats().to_summary())) == ( "DatasetStatsSummary(\n" " dataset_uuid=U,\n" - " base_name=MapBatches(),\n" + " base_name=DoRead->MapBatches,\n" " number=N,\n" " extra_metrics={\n" " obj_store_mem_alloc: N,\n" @@ -317,7 +325,7 @@ def test_dataset__repr__(ray_start_regular_shared): " },\n" " stage_stats=[\n" " StageStatsSummary(\n" - " stage_name='MapBatches()',\n" + " stage_name='DoRead->MapBatches',\n" " is_substage=False,\n" " time_total_s=T,\n" " block_execution_summary_str=N/N blocks executed in T\n" @@ -386,9 +394,13 @@ def test_dataset_stats_shuffle(ray_start_regular_shared): stats = canonicalize(ds.materialize().stats()) assert ( stats - == """Stage N ReadRange->RandomShuffle: executed in T + == """Stage Z Read: N/N blocks split from parent in T +* Output num rows: N min, N max, N mean, N total +* Output size bytes: N min, N max, N mean, N total - Substage Z ReadRange->RandomShuffleMap: N/N blocks executed +Stage N DoRead->RandomShuffle: executed in T + + Substage Z DoRead->RandomShuffleMap: N/N blocks executed * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -470,7 +482,11 @@ def test_dataset_stats_read_parquet(ray_start_regular_shared, tmp_path): if context.new_execution_backend: assert ( stats - == """Stage N ReadParquet->Map: N/N blocks executed in T + == """Stage Z Read: N/N blocks split from parent in T +* Output num rows: N min, N max, N mean, N total +* Output size bytes: N min, N max, N mean, N total + +Stage N DoRead->MapRows: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -484,7 +500,11 @@ def test_dataset_stats_read_parquet(ray_start_regular_shared, tmp_path): else: assert ( stats - == """Stage N Read->Map: N/N blocks executed in T + == """Stage Z Read: N/N blocks split from parent in T +* Output num rows: N min, N max, N mean, N total +* Output size bytes: N min, N max, N mean, N total + +Stage N DoRead->MapRows: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -506,7 +526,11 @@ def test_dataset_split_stats(ray_start_regular_shared, tmp_path): if context.new_execution_backend: assert ( stats - == """Stage N ReadRange->Map: N/N blocks executed in T + == """Stage Z Read: N/N blocks split from parent in T +* Output num rows: N min, N max, N mean, N total +* Output size bytes: N min, N max, N mean, N total + +Stage N DoRead->MapRows: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -538,7 +562,11 @@ def test_dataset_split_stats(ray_start_regular_shared, tmp_path): else: assert ( stats - == """Stage N Read->Map: N/N blocks executed in T + == """Stage Z Read: N/N blocks split from parent in T +* Output num rows: N min, N max, N mean, N total +* Output size bytes: N min, N max, N mean, N total + +Stage N DoRead->MapRows: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -597,7 +625,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ if context.new_execution_backend: assert ( canonicalize(logger_args[0]) - == """Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T + == """Stage N DoRead->MapBatches: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -611,7 +639,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ else: assert ( canonicalize(logger_args[0]) - == """Stage N Read->MapBatches(dummy_map_batches): N/N blocks executed in T + == """Stage N DoRead->MapBatches: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -629,7 +657,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ if context.new_execution_backend: assert ( canonicalize(logger_args[0]) - == """Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T + == """Stage N DoRead->MapBatches: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -643,7 +671,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ else: assert ( canonicalize(logger_args[0]) - == """Stage N Read->MapBatches(dummy_map_batches): N/N blocks executed in T + == """Stage N DoRead->MapBatches: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -693,7 +721,11 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ assert ( stats == """== Pipeline Window N == -Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T +Stage Z Read: N/N blocks split from parent in T +* Output num rows: N min, N max, N mean, N total +* Output size bytes: N min, N max, N mean, N total + +Stage N DoRead->MapBatches: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -714,7 +746,9 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ 'obj_store_mem_peak': N} == Pipeline Window N == -Stage N ReadRange->MapBatches(dummy_map_batches): [execution cached] +Stage Z Read: [execution cached] + +Stage N DoRead->MapBatches: [execution cached] * Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, \ 'obj_store_mem_peak': N} @@ -729,7 +763,9 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ 'obj_store_mem_peak': N} == Pipeline Window N == -Stage N ReadRange->MapBatches(dummy_map_batches): [execution cached] +Stage Z Read: [execution cached] + +Stage N DoRead->MapBatches: [execution cached] * Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, \ 'obj_store_mem_peak': N} @@ -831,7 +867,7 @@ def test_dataset_pipeline_cache_cases(ray_start_regular_shared): ds.take(999) stats = ds.stats() assert "[execution cached]" in stats - assert "ReadRange->MapBatches(dummy_map_batches)" in stats + assert "DoRead->MapBatches" in stats def test_dataset_pipeline_split_stats_basic(ray_start_regular_shared): @@ -1081,7 +1117,11 @@ def test_streaming_stats_full(ray_start_regular_shared, restore_data_context): stats = canonicalize(ds.stats()) assert ( stats - == """Stage N ReadRange->Map: N/N blocks executed in T + == """Stage Z Read: N/N blocks split from parent in T +* Output num rows: N min, N max, N mean, N total +* Output size bytes: N min, N max, N mean, N total + +Stage N DoRead->MapRows: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean diff --git a/python/ray/data/tests/test_streaming_backpressure_edge_case.py b/python/ray/data/tests/test_streaming_backpressure_edge_case.py index 2c4134c27165..1bc3e45efc8e 100644 --- a/python/ray/data/tests/test_streaming_backpressure_edge_case.py +++ b/python/ray/data/tests/test_streaming_backpressure_edge_case.py @@ -12,7 +12,6 @@ def test_input_backpressure_e2e(restore_data_context, shutdown_only): - # Tests that backpressure applies even when reading directly from the input # datasource. This relies on datasource metadata size estimation. @ray.remote @@ -73,7 +72,6 @@ def range_(i): def test_streaming_backpressure_e2e(restore_data_context): - # This test case is particularly challenging since there is a large input->output # increase in data size: https://github.com/ray-project/ray/issues/34041 class TestSlow: diff --git a/python/ray/data/tests/util.py b/python/ray/data/tests/util.py index 9c66784d19ef..cee2c904ed57 100644 --- a/python/ray/data/tests/util.py +++ b/python/ray/data/tests/util.py @@ -5,6 +5,7 @@ import ray STRICT_MODE = ray.data.DatasetContext.get_current().strict_mode +OPTIMIZER_ENABLED = ray.data.DatasetContext.get_current().optimizer_enabled @ray.remote From a98877fa66c1d37c86c924b34d991de241b2a58c Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Mon, 22 May 2023 11:47:22 -0700 Subject: [PATCH 02/10] revert some changes Signed-off-by: Hao Chen --- .../logical/operators/input_data_operator.py | 21 ----- .../logical/operators/read_operator.py | 10 ++- .../_internal/planner/plan_input_data_op.py | 11 --- python/ray/data/_internal/planner/planner.py | 5 -- python/ray/data/context.py | 3 +- python/ray/data/dataset.py | 20 ----- python/ray/data/read_api.py | 4 +- .../data/tests/test_execution_optimizer.py | 45 +++++----- python/ray/data/tests/test_map.py | 3 +- python/ray/data/tests/test_optimize.py | 22 ++--- .../data/tests/test_randomize_block_order.py | 8 +- python/ray/data/tests/test_stats.py | 90 ++++++------------- .../test_streaming_backpressure_edge_case.py | 2 + python/ray/data/tests/util.py | 1 - 14 files changed, 74 insertions(+), 171 deletions(-) delete mode 100644 python/ray/data/_internal/logical/operators/input_data_operator.py delete mode 100644 python/ray/data/_internal/planner/plan_input_data_op.py diff --git a/python/ray/data/_internal/logical/operators/input_data_operator.py b/python/ray/data/_internal/logical/operators/input_data_operator.py deleted file mode 100644 index 6d9021570ab1..000000000000 --- a/python/ray/data/_internal/logical/operators/input_data_operator.py +++ /dev/null @@ -1,21 +0,0 @@ -from typing import List, Optional, Callable - -from ray.data._internal.execution.interfaces import RefBundle -from ray.data._internal.logical.interfaces import LogicalOperator - - -class InputData(LogicalOperator): - """Logical operator for input data. - - This may hold cached blocks from a previous Dataset execution, or - the arguments for read tasks. - """ - - def __init__( - self, - input_data: Optional[List[RefBundle]] = None, - input_data_factory: Callable[[], List[RefBundle]] = None, - ): - super().__init__("InputData", []) - self.input_data = input_data - self.input_data_factory = input_data_factory diff --git a/python/ray/data/_internal/logical/operators/read_operator.py b/python/ray/data/_internal/logical/operators/read_operator.py index 34f72763a4b1..7d82fb7cc9b6 100644 --- a/python/ray/data/_internal/logical/operators/read_operator.py +++ b/python/ray/data/_internal/logical/operators/read_operator.py @@ -1,7 +1,7 @@ -from typing import List, Dict, Any +from typing import Any, Dict from ray.data._internal.logical.operators.map_operator import AbstractMap -from ray.data.datasource.datasource import ReadTask, Datasource +from ray.data.datasource.datasource import Datasource class Read(AbstractMap): @@ -10,9 +10,11 @@ class Read(AbstractMap): def __init__( self, datasource: Datasource, - read_tasks: List[ReadTask], + parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, + read_args: Dict[str, Any] = None, ): super().__init__("Read", None, ray_remote_args) self._datasource = datasource - self._read_tasks = read_tasks + self._parallelism = parallelism + self._read_args = read_args diff --git a/python/ray/data/_internal/planner/plan_input_data_op.py b/python/ray/data/_internal/planner/plan_input_data_op.py deleted file mode 100644 index e22971c1aec3..000000000000 --- a/python/ray/data/_internal/planner/plan_input_data_op.py +++ /dev/null @@ -1,11 +0,0 @@ -from ray.data._internal.execution.interfaces import PhysicalOperator -from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer -from ray.data._internal.logical.operators.input_data_operator import InputData - - -def _plan_input_data_op(op: InputData) -> PhysicalOperator: - """Get the corresponding DAG of physical operators for InputData.""" - - return InputDataBuffer( - input_data=op.input_data, input_data_factory=op.input_data_factory - ) diff --git a/python/ray/data/_internal/planner/planner.py b/python/ray/data/_internal/planner/planner.py index 01a90679be62..bb8d44d42233 100644 --- a/python/ray/data/_internal/planner/planner.py +++ b/python/ray/data/_internal/planner/planner.py @@ -13,7 +13,6 @@ from ray.data._internal.logical.operators.from_items_operator import FromItems from ray.data._internal.logical.operators.from_numpy_operator import FromNumpyRefs from ray.data._internal.logical.operators.read_operator import Read -from ray.data._internal.logical.operators.input_data_operator import InputData from ray.data._internal.logical.operators.write_operator import Write from ray.data._internal.logical.operators.map_operator import AbstractUDFMap from ray.data._internal.logical.operators.limit_operator import Limit @@ -27,7 +26,6 @@ ) from ray.data._internal.planner.plan_udf_map_op import _plan_udf_map_op from ray.data._internal.planner.plan_read_op import _plan_read_op -from ray.data._internal.planner.plan_input_data_op import _plan_input_data_op from ray.data._internal.planner.plan_write_op import _plan_write_op from ray.data._internal.planner.plan_limit_op import _plan_limit_op @@ -56,9 +54,6 @@ def _plan(self, logical_op: LogicalOperator) -> PhysicalOperator: if isinstance(logical_op, Read): assert not physical_children physical_op = _plan_read_op(logical_op) - elif isinstance(logical_op, InputData): - assert not physical_children - physical_op = _plan_input_data_op(logical_op) elif isinstance(logical_op, Write): assert len(physical_children) == 1 physical_op = _plan_write_op(logical_op, physical_children[0]) diff --git a/python/ray/data/context.py b/python/ray/data/context.py index 88b055aaa19f..519a1c3148e2 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -99,7 +99,7 @@ # Whether to enable optimizer. DEFAULT_OPTIMIZER_ENABLED = bool( - int(os.environ.get("RAY_DATA_NEW_EXECUTION_OPTIMIZER", "1")) + int(os.environ.get("RAY_DATA_NEW_EXECUTION_OPTIMIZER", "0")) ) # Set this env var to enable distributed tqdm (experimental). @@ -214,6 +214,7 @@ def get_current() -> "DataContext": global _default_context with _context_lock: + if _default_context is None: _default_context = DataContext( block_splitting_enabled=DEFAULT_BLOCK_SPLITTING_ENABLED, diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 798442338b85..7596f7734ded 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -36,7 +36,6 @@ Repartition, Sort, ) -from ray.data._internal.logical.operators.input_data_operator import InputData from ray.data._internal.logical.operators.n_ary_operator import Zip from ray.data._internal.logical.optimizers import LogicalPlan from ray.data._internal.logical.operators.limit_operator import Limit @@ -3977,25 +3976,6 @@ def materialize(self) -> "MaterializedDataset": """ copy = Dataset.copy(self, _deep_copy=True, _as=MaterializedDataset) copy._plan.execute(force_read=True) - - blocks = copy._plan._snapshot_blocks - - def get_input_data(): - from ray.data._internal.execution.interfaces import RefBundle - - if not blocks: - return [] - else: - return [ - RefBundle( - blocks=[block_with_metadata], - owns_blocks=False, - ) for block_with_metadata in blocks.get_blocks_with_metadata() - ] - - # Create a new logical plan whose input is the existing data from the the old Dataset. - copy._logical_plan = LogicalPlan(InputData(input_data_factory=get_input_data)) - return copy @ConsumptionAPI(pattern="timing information.", insert_after=True) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 1e0cfe6cc98a..3cf4649c61fe 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -436,7 +436,9 @@ def read_datasource( owned_by_consumer=False, ) - read_op = Read(datasource, read_tasks, ray_remote_args) + # TODO(chengsu): avoid calling Reader.get_read_tasks() twice after removing + # LazyBlockList code path. + read_op = Read(datasource, requested_parallelism, ray_remote_args, read_args) logical_plan = LogicalPlan(read_op) return Dataset( diff --git a/python/ray/data/tests/test_execution_optimizer.py b/python/ray/data/tests/test_execution_optimizer.py index 4b6f39c45ee5..c0e575b877e5 100644 --- a/python/ray/data/tests/test_execution_optimizer.py +++ b/python/ray/data/tests/test_execution_optimizer.py @@ -73,7 +73,7 @@ def _check_usage_record(op_names: List[str], clear_after_check: Optional[bool] = def test_read_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - op = Read(ParquetDatasource(), []) + op = Read(ParquetDatasource()) plan = LogicalPlan(op) physical_op = planner.plan(plan).dag @@ -107,7 +107,7 @@ def test_from_items_e2e(ray_start_regular_shared, enable_optimizer): def test_map_batches_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) op = MapBatches( read_op, lambda x: x, @@ -130,7 +130,7 @@ def test_map_batches_e2e(ray_start_regular_shared, enable_optimizer): def test_map_rows_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) op = MapRows( read_op, lambda x: x, @@ -153,7 +153,7 @@ def test_map_rows_e2e(ray_start_regular_shared, enable_optimizer): def test_filter_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) op = Filter( read_op, lambda x: x, @@ -176,7 +176,7 @@ def test_filter_e2e(ray_start_regular_shared, enable_optimizer): def test_flat_map(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) op = FlatMap( read_op, lambda x: x, @@ -235,7 +235,7 @@ def ensure_sample_size_close(dataset, sample_percent=0.5): def test_random_shuffle_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) op = RandomShuffle( read_op, seed=0, @@ -267,7 +267,7 @@ def test_random_shuffle_e2e( ) def test_repartition_operator(ray_start_regular_shared, enable_optimizer, shuffle): planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) op = Repartition(read_op, num_outputs=5, shuffle=shuffle) plan = LogicalPlan(op) physical_op = planner.plan(plan).dag @@ -333,7 +333,7 @@ def _check_repartition_usage_and_stats(ds): def test_read_map_batches_operator_fusion(ray_start_regular_shared, enable_optimizer): # Test that Read is fused with MapBatches. planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) op = MapBatches( read_op, lambda x: x, @@ -353,7 +353,7 @@ def test_read_map_batches_operator_fusion(ray_start_regular_shared, enable_optim def test_read_map_chain_operator_fusion(ray_start_regular_shared, enable_optimizer): # Test that a chain of different map operators are fused. planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) op = MapRows(read_op, lambda x: x) op = MapBatches(op, lambda x: x) op = FlatMap(op, lambda x: x) @@ -377,7 +377,6 @@ def test_read_map_batches_operator_fusion_compatible_remote_args( planner = Planner() read_op = Read( ParquetDatasource(), - [], ray_remote_args={"num_cpus": 1, "scheduling_strategy": "SPREAD"}, ) op = MapBatches(read_op, lambda x: x, ray_remote_args={"num_cpus": 1}) @@ -399,7 +398,7 @@ def test_read_map_batches_operator_fusion_incompatible_remote_args( ): # Test that map operators are not fused when remote args are incompatible. planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) op = MapBatches(read_op, lambda x: x, ray_remote_args={"num_cpus": 2}) op = MapBatches(op, lambda x: x, ray_remote_args={"num_cpus": 3}) logical_plan = LogicalPlan(op) @@ -424,7 +423,7 @@ def test_read_map_batches_operator_fusion_compute_tasks_to_actors( # Test that a task-based map operator is fused into an actor-based map operator when # the former comes before the latter. planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) op = MapBatches(read_op, lambda x: x) op = MapBatches(op, lambda x: x, compute=ray.data.ActorPoolStrategy()) logical_plan = LogicalPlan(op) @@ -444,7 +443,7 @@ def test_read_map_batches_operator_fusion_compute_read_to_actors( ): # Test that reads fuse into an actor-based map operator. planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) op = MapBatches(read_op, lambda x: x, compute=ray.data.ActorPoolStrategy()) logical_plan = LogicalPlan(op) physical_plan = planner.plan(logical_plan) @@ -463,7 +462,7 @@ def test_read_map_batches_operator_fusion_incompatible_compute( ): # Test that map operators are not fused when compute strategies are incompatible. planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) op = MapBatches(read_op, lambda x: x, compute=ray.data.ActorPoolStrategy()) op = MapBatches(op, lambda x: x) logical_plan = LogicalPlan(op) @@ -487,7 +486,7 @@ def test_read_map_batches_operator_fusion_target_block_size( # Test that fusion of map operators merges their block sizes in the expected way # (taking the max). planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) op = MapBatches(read_op, lambda x: x, target_block_size=2) op = MapBatches(op, lambda x: x, target_block_size=5) op = MapBatches(op, lambda x: x, target_block_size=3) @@ -511,7 +510,7 @@ def test_read_map_batches_operator_fusion_callable_classes( ): # Test that callable classes can still be fused if they're the same function. planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) class UDF: def __call__(self, x): @@ -536,7 +535,7 @@ def test_read_map_batches_operator_fusion_incompatible_callable_classes( ): # Test that map operators are not fused when different callable classes are used. planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) class UDF: def __call__(self, x): @@ -569,7 +568,7 @@ def test_read_map_batches_operator_fusion_incompatible_constructor_args( # Test that map operators are not fused when callable classes have different # constructor args. planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) class UDF: def __init__(self, a): @@ -787,7 +786,7 @@ def test_write_fusion(ray_start_regular_shared, enable_optimizer, tmp_path): def test_write_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() datasource = ParquetDatasource() - read_op = Read(datasource, []) + read_op = Read(datasource) op = Write( read_op, datasource, @@ -803,7 +802,7 @@ def test_write_operator(ray_start_regular_shared, enable_optimizer): def test_sort_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) op = Sort( read_op, key="col1", @@ -883,7 +882,7 @@ def test_sort_validate_keys( def test_aggregate_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), []) + read_op = Read(ParquetDatasource()) op = Aggregate( read_op, key="col1", @@ -953,8 +952,8 @@ def test_aggregate_validate_keys( def test_zip_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op1 = Read(ParquetDatasource(), []) - read_op2 = Read(ParquetDatasource(), []) + read_op1 = Read(ParquetDatasource()) + read_op2 = Read(ParquetDatasource()) op = Zip(read_op1, read_op2) plan = LogicalPlan(op) physical_op = planner.plan(plan).dag diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index 6b61a924d2e6..4a7c2ca9cc0e 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -374,7 +374,7 @@ def test_map_batches_basic(ray_start_regular_shared, tmp_path, restore_data_cont def test_map_batches_extra_args(shutdown_only, tmp_path): ray.shutdown() - ray.init(num_cpus=3) + ray.init(num_cpus=2) def put(x): # We only support automatic deref in the legacy backend. @@ -875,7 +875,6 @@ def ensure_sample_size_close(dataset, sample_percent=0.5): ds2 = ray.data.range(2, parallelism=1) ds3 = ray.data.range(3, parallelism=1) # noinspection PyTypeChecker - # needs Union operator implemented to pass w/ optimizer ds = ds1.union(ds2).union(ds3) ensure_sample_size_close(ds) # Small datasets diff --git a/python/ray/data/tests/test_optimize.py b/python/ray/data/tests/test_optimize.py index 9af8df705926..dd8643d433d9 100644 --- a/python/ray/data/tests/test_optimize.py +++ b/python/ray/data/tests/test_optimize.py @@ -14,7 +14,7 @@ from ray.data.context import DataContext from ray.data.datasource import Datasource, ReadTask from ray.data.datasource.csv_datasource import CSVDatasource -from ray.data.tests.util import OPTIMIZER_ENABLED, column_udf, extract_values +from ray.data.tests.util import column_udf, extract_values from ray.tests.conftest import * # noqa @@ -295,7 +295,6 @@ def _assert_has_stages(stages, stage_names): assert stage.name == name -@pytest.mark.skipif(OPTIMIZER_ENABLED, reason="Deprecated with new optimizer path.") def test_stage_linking(ray_start_regular_shared): # Test lazy dataset. ds = ray.data.range(10).lazy() @@ -309,15 +308,10 @@ def test_stage_linking(ray_start_regular_shared): ds = ds.materialize() _assert_has_stages(ds._plan._stages_before_snapshot, ["Map"]) assert len(ds._plan._stages_after_snapshot) == 0 - # since the new optimizer path does not use the stage fusion path, - # but instead performs operator fusion, we expect _last_optimized_stages - # to be not updated (None)? _assert_has_stages(ds._plan._last_optimized_stages, ["ReadRange->Map"]) def test_optimize_reorder(ray_start_regular_shared): - # The ReorderRandomizeBlocksRule optimizer rule collapses RandomizeBlocks operators, - # so we should not be fusing them to begin with. context = DataContext.get_current() context.optimize_fuse_stages = True context.optimize_fuse_read_stages = True @@ -327,7 +321,7 @@ def test_optimize_reorder(ray_start_regular_shared): expect_stages( ds, 2, - ["DoRead->MapBatches"], + ["ReadRange->MapBatches(dummy_map)", "RandomizeBlockOrder"], ) ds2 = ( @@ -340,7 +334,7 @@ def test_optimize_reorder(ray_start_regular_shared): expect_stages( ds2, 3, - ["DoRead", "Repartition", "MapBatches"], + ["ReadRange->RandomizeBlockOrder", "Repartition", "MapBatches(dummy_map)"], ) @@ -366,7 +360,7 @@ def test_write_fusion(ray_start_regular_shared, tmp_path): ds = ray.data.range(100).map_batches(lambda x: x) ds.write_csv(path) stats = ds._write_ds.stats() - assert "DoRead->MapBatches->Write" in stats, stats + assert "ReadRange->MapBatches()->Write" in stats, stats ds = ( ray.data.range(100) @@ -376,9 +370,9 @@ def test_write_fusion(ray_start_regular_shared, tmp_path): ) ds.write_csv(path) stats = ds._write_ds.stats() - assert "DoRead->MapBatches" in stats, stats + assert "ReadRange->MapBatches()" in stats, stats assert "RandomShuffle" in stats, stats - assert "MapBatches->Write" in stats, stats + assert "MapBatches()->Write" in stats, stats def test_write_doesnt_reorder_randomize_block(ray_start_regular_shared, tmp_path): @@ -389,7 +383,9 @@ def test_write_doesnt_reorder_randomize_block(ray_start_regular_shared, tmp_path # The randomize_block_order will switch order with the following map_batches, # but not the tailing write operator. - assert "DoRead->MapBatches->Write" in stats, stats + assert "ReadRange->MapBatches()" in stats, stats + assert "RandomizeBlockOrder" in stats, stats + assert "Write" in stats, stats def test_optimize_fuse(ray_start_regular_shared): diff --git a/python/ray/data/tests/test_randomize_block_order.py b/python/ray/data/tests/test_randomize_block_order.py index 3c744fc0c7c5..bccef7aaf331 100644 --- a/python/ray/data/tests/test_randomize_block_order.py +++ b/python/ray/data/tests/test_randomize_block_order.py @@ -19,7 +19,7 @@ def test_randomize_blocks_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(datasource=None, []) + read_op = Read(datasource=None) op = RandomizeBlocks( read_op, seed=0, @@ -34,7 +34,7 @@ def test_randomize_blocks_operator(ray_start_regular_shared, enable_optimizer): def test_randomize_block_order_rule(): - read = Read(datasource=None, []) + read = Read(datasource=None) operator1 = RandomizeBlocks(input_op=read, seed=None) operator2 = RandomizeBlocks(input_op=operator1, seed=None) operator3 = MapBatches(input_op=operator2, fn=lambda x: x) @@ -57,7 +57,7 @@ def test_randomize_block_order_rule(): def test_randomize_block_order_rule_seed(): - read = Read(datasource=None, []) + read = Read(datasource=None) operator1 = RandomizeBlocks(input_op=read, seed=None) operator2 = RandomizeBlocks(input_op=operator1, seed=2) operator3 = MapBatches(input_op=operator2, fn=lambda x: x) @@ -84,7 +84,7 @@ def test_randomize_block_order_rule_seed(): def test_randomize_block_order_after_repartition(): - read = Read(datasource=None, []) + read = Read(datasource=None) operator1 = RandomizeBlocks(input_op=read) operator2 = Repartition(input_op=operator1, num_outputs=1, shuffle=False) operator3 = RandomizeBlocks(input_op=operator2) diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index a63bbcd4e51c..c148a5bd545d 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -65,7 +65,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): if context.new_execution_backend: assert ( canonicalize(logger_args[0]) - == """Stage N DoRead->MapBatches: N/N blocks executed in T + == """Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -79,7 +79,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): else: assert ( canonicalize(logger_args[0]) - == """Stage N DoRead->MapBatches: N/N blocks executed in T + == """Stage N Read->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -96,7 +96,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): if context.new_execution_backend: assert ( canonicalize(logger_args[0]) - == """Stage N DoRead->MapBatches->MapRows: N/N blocks executed in T + == """Stage N Map: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -110,7 +110,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): else: assert ( canonicalize(logger_args[0]) - == """Stage N DoRead->MapBatches->MapRows: N/N blocks executed in T + == """Stage N Map: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -127,11 +127,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): if context.use_streaming_executor: assert ( stats - == """Stage Z Read: N/N blocks split from parent in T -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total - -Stage N DoRead->MapBatches: N/N blocks executed in T + == """Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -141,7 +137,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): * Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, \ 'obj_store_mem_peak': N} -Stage N DoRead->MapBatches->MapRows: N/N blocks executed in T +Stage N Map: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -167,11 +163,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): else: assert ( stats - == """Stage Z Read: N/N blocks split from parent in T -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total - -Stage N DoRead->MapBatches: N/N blocks executed in T + == """Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -181,7 +173,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): * Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, \ 'obj_store_mem_peak': N} -Stage N DoRead->MapBatches->MapRows: N/N blocks executed in T +Stage N Map: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -316,7 +308,7 @@ def test_dataset__repr__(ray_start_regular_shared): assert canonicalize(repr(ds2._plan.stats().to_summary())) == ( "DatasetStatsSummary(\n" " dataset_uuid=U,\n" - " base_name=DoRead->MapBatches,\n" + " base_name=MapBatches(),\n" " number=N,\n" " extra_metrics={\n" " obj_store_mem_alloc: N,\n" @@ -325,7 +317,7 @@ def test_dataset__repr__(ray_start_regular_shared): " },\n" " stage_stats=[\n" " StageStatsSummary(\n" - " stage_name='DoRead->MapBatches',\n" + " stage_name='MapBatches()',\n" " is_substage=False,\n" " time_total_s=T,\n" " block_execution_summary_str=N/N blocks executed in T\n" @@ -394,13 +386,9 @@ def test_dataset_stats_shuffle(ray_start_regular_shared): stats = canonicalize(ds.materialize().stats()) assert ( stats - == """Stage Z Read: N/N blocks split from parent in T -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total + == """Stage N ReadRange->RandomShuffle: executed in T -Stage N DoRead->RandomShuffle: executed in T - - Substage Z DoRead->RandomShuffleMap: N/N blocks executed + Substage Z ReadRange->RandomShuffleMap: N/N blocks executed * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -482,11 +470,7 @@ def test_dataset_stats_read_parquet(ray_start_regular_shared, tmp_path): if context.new_execution_backend: assert ( stats - == """Stage Z Read: N/N blocks split from parent in T -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total - -Stage N DoRead->MapRows: N/N blocks executed in T + == """Stage N ReadParquet->Map: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -500,11 +484,7 @@ def test_dataset_stats_read_parquet(ray_start_regular_shared, tmp_path): else: assert ( stats - == """Stage Z Read: N/N blocks split from parent in T -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total - -Stage N DoRead->MapRows: N/N blocks executed in T + == """Stage N Read->Map: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -526,11 +506,7 @@ def test_dataset_split_stats(ray_start_regular_shared, tmp_path): if context.new_execution_backend: assert ( stats - == """Stage Z Read: N/N blocks split from parent in T -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total - -Stage N DoRead->MapRows: N/N blocks executed in T + == """Stage N ReadRange->Map: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -562,11 +538,7 @@ def test_dataset_split_stats(ray_start_regular_shared, tmp_path): else: assert ( stats - == """Stage Z Read: N/N blocks split from parent in T -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total - -Stage N DoRead->MapRows: N/N blocks executed in T + == """Stage N Read->Map: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -625,7 +597,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ if context.new_execution_backend: assert ( canonicalize(logger_args[0]) - == """Stage N DoRead->MapBatches: N/N blocks executed in T + == """Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -639,7 +611,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ else: assert ( canonicalize(logger_args[0]) - == """Stage N DoRead->MapBatches: N/N blocks executed in T + == """Stage N Read->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -657,7 +629,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ if context.new_execution_backend: assert ( canonicalize(logger_args[0]) - == """Stage N DoRead->MapBatches: N/N blocks executed in T + == """Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -671,7 +643,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ else: assert ( canonicalize(logger_args[0]) - == """Stage N DoRead->MapBatches: N/N blocks executed in T + == """Stage N Read->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -721,11 +693,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ assert ( stats == """== Pipeline Window N == -Stage Z Read: N/N blocks split from parent in T -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total - -Stage N DoRead->MapBatches: N/N blocks executed in T +Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -746,9 +714,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ 'obj_store_mem_peak': N} == Pipeline Window N == -Stage Z Read: [execution cached] - -Stage N DoRead->MapBatches: [execution cached] +Stage N ReadRange->MapBatches(dummy_map_batches): [execution cached] * Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, \ 'obj_store_mem_peak': N} @@ -763,9 +729,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ 'obj_store_mem_peak': N} == Pipeline Window N == -Stage Z Read: [execution cached] - -Stage N DoRead->MapBatches: [execution cached] +Stage N ReadRange->MapBatches(dummy_map_batches): [execution cached] * Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, \ 'obj_store_mem_peak': N} @@ -867,7 +831,7 @@ def test_dataset_pipeline_cache_cases(ray_start_regular_shared): ds.take(999) stats = ds.stats() assert "[execution cached]" in stats - assert "DoRead->MapBatches" in stats + assert "ReadRange->MapBatches(dummy_map_batches)" in stats def test_dataset_pipeline_split_stats_basic(ray_start_regular_shared): @@ -1117,11 +1081,7 @@ def test_streaming_stats_full(ray_start_regular_shared, restore_data_context): stats = canonicalize(ds.stats()) assert ( stats - == """Stage Z Read: N/N blocks split from parent in T -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total - -Stage N DoRead->MapRows: N/N blocks executed in T + == """Stage N ReadRange->Map: N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean diff --git a/python/ray/data/tests/test_streaming_backpressure_edge_case.py b/python/ray/data/tests/test_streaming_backpressure_edge_case.py index 1bc3e45efc8e..2c4134c27165 100644 --- a/python/ray/data/tests/test_streaming_backpressure_edge_case.py +++ b/python/ray/data/tests/test_streaming_backpressure_edge_case.py @@ -12,6 +12,7 @@ def test_input_backpressure_e2e(restore_data_context, shutdown_only): + # Tests that backpressure applies even when reading directly from the input # datasource. This relies on datasource metadata size estimation. @ray.remote @@ -72,6 +73,7 @@ def range_(i): def test_streaming_backpressure_e2e(restore_data_context): + # This test case is particularly challenging since there is a large input->output # increase in data size: https://github.com/ray-project/ray/issues/34041 class TestSlow: diff --git a/python/ray/data/tests/util.py b/python/ray/data/tests/util.py index cee2c904ed57..9c66784d19ef 100644 --- a/python/ray/data/tests/util.py +++ b/python/ray/data/tests/util.py @@ -5,7 +5,6 @@ import ray STRICT_MODE = ray.data.DatasetContext.get_current().strict_mode -OPTIMIZER_ENABLED = ray.data.DatasetContext.get_current().optimizer_enabled @ray.remote From 5996f017b3df906ee708c20fdc0d09a40fdb858d Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Mon, 22 May 2023 11:56:46 -0700 Subject: [PATCH 03/10] revert plan_read_op.py Signed-off-by: Hao Chen --- .../data/_internal/planner/plan_read_op.py | 40 +++++-------------- 1 file changed, 11 insertions(+), 29 deletions(-) diff --git a/python/ray/data/_internal/planner/plan_read_op.py b/python/ray/data/_internal/planner/plan_read_op.py index be5d6400e7b8..56bc459fab56 100644 --- a/python/ray/data/_internal/planner/plan_read_op.py +++ b/python/ray/data/_internal/planner/plan_read_op.py @@ -10,29 +10,9 @@ from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.logical.operators.read_operator import Read -from ray.data.block import Block +from ray.data.block import Block, BlockMetadata from ray.data.datasource.datasource import ReadTask -TASK_SIZE_WARN_THRESHOLD_BYTES = 100000 - - -# Defensively compute the size of the block as the max size reported by the -# datasource and the actual read task size. This is to guard against issues -# with bad metadata reporting. -def cleaned_metadata(read_task): - block_meta = read_task.get_metadata() - task_size = len(cloudpickle.dumps(read_task)) - if block_meta.size_bytes is None or task_size > block_meta.size_bytes: - if task_size > TASK_SIZE_WARN_THRESHOLD_BYTES: - print( - f"WARNING: the read task size ({task_size} bytes) is larger " - "than the reported output size of the task " - f"({block_meta.size_bytes} bytes). This may be a size " - "reporting bug in the datasource being read from." - ) - block_meta.size_bytes = task_size - return block_meta - def _plan_read_op(op: Read) -> PhysicalOperator: """Get the corresponding DAG of physical operators for Read. @@ -42,7 +22,8 @@ def _plan_read_op(op: Read) -> PhysicalOperator: """ def get_input_data() -> List[RefBundle]: - read_tasks = op._read_tasks + reader = op._datasource.create_reader(**op._read_args) + read_tasks = reader.get_read_tasks(op._parallelism) return [ RefBundle( [ @@ -50,7 +31,13 @@ def get_input_data() -> List[RefBundle]: # TODO(chengsu): figure out a better way to pass read # tasks other than ray.put(). ray.put(read_task), - cleaned_metadata(read_task), + BlockMetadata( + num_rows=1, + size_bytes=len(cloudpickle.dumps(read_task)), + schema=None, + input_files=[], + exec_stats=None, + ), ) ], owns_blocks=True, @@ -64,9 +51,4 @@ def do_read(blocks: Iterator[ReadTask], ctx: TaskContext) -> Iterator[Block]: for read_task in blocks: yield from read_task() - return MapOperator.create( - do_read, - inputs, - name="DoRead", - ray_remote_args=op._ray_remote_args, - ) + return MapOperator.create(do_read, inputs, name="DoRead") From 758e3699b01aab70a8033cb38b341638da989cf4 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Mon, 22 May 2023 11:57:47 -0700 Subject: [PATCH 04/10] refine Signed-off-by: Hao Chen --- python/ray/data/_internal/planner/random_shuffle.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/data/_internal/planner/random_shuffle.py b/python/ray/data/_internal/planner/random_shuffle.py index 3ebb420c1932..3bc68ad4cc43 100644 --- a/python/ray/data/_internal/planner/random_shuffle.py +++ b/python/ray/data/_internal/planner/random_shuffle.py @@ -35,12 +35,12 @@ def fn( # is applied to each block before shuffling. map_transform_fn: Optional[MapTransformFn] = ctx.upstream_map_transform_fn upstream_map_fn = None - _ray_remote_args = ray_remote_args + nonlocal ray_remote_args if map_transform_fn: upstream_map_fn = lambda block: map_transform_fn(block, ctx) # noqa: E731 # If there is a fused upstream operator, # also use the ray_remote_args from the fused upstream operator. - _ray_remote_args = ctx.upstream_map_ray_remote_args + ray_remote_args = ctx.upstream_map_ray_remote_args shuffle_spec = ShuffleTaskSpec( random_shuffle=True, @@ -60,8 +60,8 @@ def fn( return scheduler.execute( refs, num_outputs or num_input_blocks, - map_ray_remote_args=_ray_remote_args, - reduce_ray_remote_args=_ray_remote_args, + map_ray_remote_args=ray_remote_args, + reduce_ray_remote_args=ray_remote_args, ) return fn From 6cd3c5ada3e16442246af6b93f9f79cf08f50d7d Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Mon, 22 May 2023 20:55:42 -0700 Subject: [PATCH 05/10] refine Signed-off-by: Hao Chen --- python/ray/data/_internal/plan.py | 29 +---------------------------- 1 file changed, 1 insertion(+), 28 deletions(-) diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 235247636973..a3848acfd8df 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -34,6 +34,7 @@ from ray.data._internal.dataset_logger import DatasetLogger from ray.data._internal.execution.interfaces import TaskContext from ray.data._internal.lazy_block_list import LazyBlockList +from ray.data._internal.logical.rules.operator_fusion import _are_remote_args_compatible from ray.data._internal.stats import DatasetStats, DatasetStatsSummary from ray.data.block import Block from ray.data.context import DataContext @@ -1266,34 +1267,6 @@ def _fuse_one_to_one_stages(stages: List[Stage]) -> List[Stage]: return fused_stages -def _are_remote_args_compatible(prev_args, next_args): - """Check if Ray remote arguments are compatible for merging.""" - prev_args = _canonicalize(prev_args) - next_args = _canonicalize(next_args) - remote_args = next_args.copy() - for key in INHERITABLE_REMOTE_ARGS: - if key in prev_args: - remote_args[key] = prev_args[key] - if prev_args != remote_args: - return False - return True - - -def _canonicalize(remote_args: dict) -> dict: - """Returns canonical form of given remote args.""" - remote_args = remote_args.copy() - if "num_cpus" not in remote_args or remote_args["num_cpus"] is None: - remote_args["num_cpus"] = 1 - if "num_gpus" not in remote_args or remote_args["num_gpus"] is None: - remote_args["num_gpus"] = 0 - resources = remote_args.get("resources", {}) - for k, v in list(resources.items()): - if v is None or v == 0.0: - del resources[k] - remote_args["resources"] = resources - return remote_args - - def _is_lazy(blocks: BlockList) -> bool: """Whether the provided block list is lazy.""" return isinstance(blocks, LazyBlockList) From 3432b741534a5234728d1863fbefdf096668bf81 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Tue, 23 May 2023 10:05:55 -0700 Subject: [PATCH 06/10] skip Signed-off-by: Hao Chen --- python/ray/data/tests/test_execution_optimizer.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/ray/data/tests/test_execution_optimizer.py b/python/ray/data/tests/test_execution_optimizer.py index c0e575b877e5..d8cedc760fc3 100644 --- a/python/ray/data/tests/test_execution_optimizer.py +++ b/python/ray/data/tests/test_execution_optimizer.py @@ -505,6 +505,10 @@ def test_read_map_batches_operator_fusion_target_block_size( assert isinstance(physical_op.input_dependencies[0], InputDataBuffer) +# TODO(hchen): The old code path supports fusing 2 actors with the same class. +# But this doesn't seem useful in practice. Confirm whether we need this +# for the new code path. +@pytest.mark.skip("Optimizer doesn't supporting fusing two actors yet.") def test_read_map_batches_operator_fusion_callable_classes( ray_start_regular_shared, enable_optimizer ): From 1b9acf84eb4fbe49ee22cf115f48a1c11fdb0c67 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Wed, 24 May 2023 11:44:37 -0700 Subject: [PATCH 07/10] unit test Signed-off-by: Hao Chen --- .../data/_internal/execution/legacy_compat.py | 2 +- .../data/tests/test_execution_optimizer.py | 125 +++++++++++++----- 2 files changed, 90 insertions(+), 37 deletions(-) diff --git a/python/ray/data/_internal/execution/legacy_compat.py b/python/ray/data/_internal/execution/legacy_compat.py index e5ed8651454b..e469a7217c32 100644 --- a/python/ray/data/_internal/execution/legacy_compat.py +++ b/python/ray/data/_internal/execution/legacy_compat.py @@ -138,7 +138,7 @@ def _get_execution_dag( # Get DAG of physical operators and input statistics. if ( DataContext.get_current().optimizer_enabled - # TODO(hchen): Remove this when all operators support local plan. + # TODO(hchen): Remove this when all operators support logical plan. and getattr(plan, "_logical_plan", None) is not None ): dag = get_execution_plan(plan._logical_plan).dag diff --git a/python/ray/data/tests/test_execution_optimizer.py b/python/ray/data/tests/test_execution_optimizer.py index d8cedc760fc3..0ebabf099f26 100644 --- a/python/ray/data/tests/test_execution_optimizer.py +++ b/python/ray/data/tests/test_execution_optimizer.py @@ -374,47 +374,100 @@ def test_read_map_batches_operator_fusion_compatible_remote_args( ray_start_regular_shared, enable_optimizer ): # Test that map operators are stilled fused when remote args are compatible. - planner = Planner() - read_op = Read( - ParquetDatasource(), - ray_remote_args={"num_cpus": 1, "scheduling_strategy": "SPREAD"}, - ) - op = MapBatches(read_op, lambda x: x, ray_remote_args={"num_cpus": 1}) - op = MapBatches(op, lambda x: x, ray_remote_args={"num_cpus": 1}) - logical_plan = LogicalPlan(op) - physical_plan = planner.plan(logical_plan) - physical_plan = PhysicalOptimizer().optimize(physical_plan) - physical_op = physical_plan.dag - - assert op.name == "MapBatches" - assert physical_op.name == "DoRead->MapBatches->MapBatches" - assert isinstance(physical_op, MapOperator) - assert len(physical_op.input_dependencies) == 1 - assert isinstance(physical_op.input_dependencies[0], InputDataBuffer) + compatiple_remote_args_pairs = [ + # Empty remote args are compatible. + ({}, {}), + # Test `num_cpus` and `num_gpus`. + ({"num_cpus": 2}, {"num_cpus": 2}), + ({"num_gpus": 2}, {"num_gpus": 2}), + # `num_cpus` defaults to 1, `num_gpus` defaults to 0. + # The following 2 should be compatible. + ({"num_cpus": 1}, {}), + ({}, {"num_gpus": 0}), + # Test specifying custom resources. + ({"resources": {"custom": 1}}, {"resources": {"custom": 1}}), + ({"resources": {"custom": 0}}, {"resources": {}}), + # If the downstream op doesn't have `scheduling_strategy`, it will + # inherit from the upstream op. + ({"scheduling_strategy": "SPREAD"}, {}), + ] + for up_remote_args, down_remote_args in compatiple_remote_args_pairs: + planner = Planner() + read_op = Read( + ParquetDatasource(), + # This case is testing fusing the following 2 map_batches operators. + # So we add incompatible remote args to the read op to make sure + # it doesn't get fused. + ray_remote_args={"resources": {"non-existent": 1}}, + ) + op = MapBatches(read_op, lambda x: x, ray_remote_args=up_remote_args) + op = MapBatches(op, lambda x: x, ray_remote_args=down_remote_args) + logical_plan = LogicalPlan(op) + physical_plan = planner.plan(logical_plan) + physical_plan = PhysicalOptimizer().optimize(physical_plan) + physical_op = physical_plan.dag + + assert op.name == "MapBatches", (up_remote_args, down_remote_args) + assert physical_op.name == "MapBatches->MapBatches", ( + up_remote_args, + down_remote_args, + ) + assert isinstance(physical_op, MapOperator), (up_remote_args, down_remote_args) + assert len(physical_op.input_dependencies) == 1, ( + up_remote_args, + down_remote_args, + ) + assert physical_op.input_dependencies[0].name == "DoRead", ( + up_remote_args, + down_remote_args, + ) def test_read_map_batches_operator_fusion_incompatible_remote_args( ray_start_regular_shared, enable_optimizer ): - # Test that map operators are not fused when remote args are incompatible. - planner = Planner() - read_op = Read(ParquetDatasource()) - op = MapBatches(read_op, lambda x: x, ray_remote_args={"num_cpus": 2}) - op = MapBatches(op, lambda x: x, ray_remote_args={"num_cpus": 3}) - logical_plan = LogicalPlan(op) - physical_plan = planner.plan(logical_plan) - physical_plan = PhysicalOptimizer().optimize(physical_plan) - physical_op = physical_plan.dag - - assert op.name == "MapBatches" - assert physical_op.name == "MapBatches" - assert isinstance(physical_op, MapOperator) - assert len(physical_op.input_dependencies) == 1 - upstream_physical_op = physical_op.input_dependencies[0] - assert isinstance(upstream_physical_op, MapOperator) - # Read shouldn't fuse into first MapBatches either, due to the differing CPU - # request. - assert upstream_physical_op.name == "MapBatches" + # Test that map operators won't get fused if the remote args are incompatible. + incompatiple_remote_args_pairs = [ + # Use different resources. + ({"num_cpus": 2}, {"num_gpus": 2}), + # Same resource, but different values. + ({"num_cpus": 3}, {"num_cpus": 2}), + # Incompatible custom resources. + ({"resources": {"custom": 2}}, {"resources": {"custom": 1}}), + ({"resources": {"custom1": 1}}, {"resources": {"custom2": 1}}), + # Different scheduling strategies. + ({"scheduling_strategy": "SPREAD"}, {"scheduing_strategy": "PACK"}), + ] + for up_remote_args, down_remote_args in incompatiple_remote_args_pairs: + planner = Planner() + read_op = Read( + ParquetDatasource(), + # This case is testing fusing the following 2 map_batches operators. + # So we add incompatible remote args to the read op to make sure + # it doesn't get fused. + ray_remote_args={"resources": {"non-existent": 1}}, + ) + op = MapBatches(read_op, lambda x: x, ray_remote_args=up_remote_args) + op = MapBatches(op, lambda x: x, ray_remote_args=down_remote_args) + logical_plan = LogicalPlan(op) + physical_plan = planner.plan(logical_plan) + physical_plan = PhysicalOptimizer().optimize(physical_plan) + physical_op = physical_plan.dag + + assert op.name == "MapBatches", (up_remote_args, down_remote_args) + assert physical_op.name == "MapBatches", ( + up_remote_args, + down_remote_args, + ) + assert isinstance(physical_op, MapOperator), (up_remote_args, down_remote_args) + assert len(physical_op.input_dependencies) == 1, ( + up_remote_args, + down_remote_args, + ) + assert physical_op.input_dependencies[0].name == "MapBatches", ( + up_remote_args, + down_remote_args, + ) def test_read_map_batches_operator_fusion_compute_tasks_to_actors( From 385162ff64ec1eaf3f98fde753a8844614645e6d Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Tue, 30 May 2023 13:16:54 -0700 Subject: [PATCH 08/10] fix _plan_from_pandas_refs_op Signed-off-by: Hao Chen --- .../ray/data/_internal/planner/plan_from_pandas_op.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/planner/plan_from_pandas_op.py b/python/ray/data/_internal/planner/plan_from_pandas_op.py index 6edaad5ecb92..a066c1fa8d34 100644 --- a/python/ray/data/_internal/planner/plan_from_pandas_op.py +++ b/python/ray/data/_internal/planner/plan_from_pandas_op.py @@ -64,16 +64,23 @@ def get_input_data() -> List[RefBundle]: get_table_block_metadata, ) - owns_blocks = False - if isinstance(op, FromDask): + if isinstance(op, FromPandasRefs): + # Data is already put into the the Ray object store. + # So owns_blocks should be False. + owns_blocks = False + elif isinstance(op, FromDask): _init_data_from_dask(op) + owns_blocks = True elif isinstance(op, FromModin): _init_data_from_modin(op) + owns_blocks = True elif isinstance(op, FromMars): _init_data_from_mars(op) # MARS holds the MARS dataframe in memory in `to_ray_dataset()` # to avoid object GC, so this operator cannot not own the blocks. owns_blocks = False + else: + raise ValueError(f"Unsupported operator type: {type(op)}") context = DataContext.get_current() From 0d38d0c4445f73b9a85e4982ffefe8e5226ea459 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Tue, 30 May 2023 17:28:33 -0700 Subject: [PATCH 09/10] fix owns_blocks Signed-off-by: Hao Chen --- python/ray/data/_internal/planner/plan_from_items_op.py | 2 +- python/ray/data/_internal/planner/plan_from_numpy_op.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/planner/plan_from_items_op.py b/python/ray/data/_internal/planner/plan_from_items_op.py index 35194948ee4c..95507501bc02 100644 --- a/python/ray/data/_internal/planner/plan_from_items_op.py +++ b/python/ray/data/_internal/planner/plan_from_items_op.py @@ -48,7 +48,7 @@ def get_input_data() -> List[RefBundle]: ) block_ref_bundle = RefBundle( [(ray.put(block), block_metadata)], - owns_blocks=False, + owns_blocks=True, ) ref_bundles.append(block_ref_bundle) return ref_bundles diff --git a/python/ray/data/_internal/planner/plan_from_numpy_op.py b/python/ray/data/_internal/planner/plan_from_numpy_op.py index ec172dcee62c..969b4c26478b 100644 --- a/python/ray/data/_internal/planner/plan_from_numpy_op.py +++ b/python/ray/data/_internal/planner/plan_from_numpy_op.py @@ -27,7 +27,7 @@ def get_input_data() -> List[RefBundle]: blocks, metadata = map(list, zip(*res)) metadata = ray.get(metadata) ref_bundles: List[RefBundle] = [ - RefBundle([(block, block_metadata)], owns_blocks=False) + RefBundle([(block, block_metadata)], owns_blocks=True) for block, block_metadata in zip(blocks, metadata) ] return ref_bundles From f3d43be25eb622b16eede9c2872c157cf42d944f Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Tue, 30 May 2023 18:09:38 -0700 Subject: [PATCH 10/10] add note Signed-off-by: Hao Chen --- python/ray/data/_internal/execution/interfaces.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index 8cda45982d8c..51744aa164af 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -233,6 +233,15 @@ class TaskContext: # TODO(chengsu): clean it up from TaskContext with new optimizer framework. sub_progress_bar_dict: Optional[Dict[str, ProgressBar]] = None + # NOTE(hchen): `upstream_map_transform_fn` and `upstream_map_ray_remote_args` + # are only used for `RandomShuffle`. DO NOT use them for other operators. + # Ideally, they should be handled by the optimizer, and should be transparent + # to the specific operators. + # But for `RandomShuffle`, the AllToAllOperator doesn't do the shuffle itself. + # It uses `ExchangeTaskScheduler` to launch new tasks to do the shuffle. + # That's why we need to pass them to `ExchangeTaskScheduler`. + # TODO(hchen): Use a physical operator to do the shuffle directly. + # The underlying function called in a MapOperator; this is used when fusing # an AllToAllOperator with an upstream MapOperator. upstream_map_transform_fn: Optional["MapTransformFn"] = None