diff --git a/doc/source/data/getting-started.rst b/doc/source/data/getting-started.rst index f33b089c341bd..6136564e3cb3a 100644 --- a/doc/source/data/getting-started.rst +++ b/doc/source/data/getting-started.rst @@ -66,7 +66,7 @@ transform datasets. Ray executes transformations in parallel for performance. .. testoutput:: MaterializedDataset( - num_blocks=..., + num_blocks=1, num_rows=150, schema={ sepal length (cm): double, @@ -147,7 +147,7 @@ or remote filesystems. import os - transformed_ds.repartition(1).write_parquet("/tmp/iris") + transformed_ds.write_parquet("/tmp/iris") print(os.listdir("/tmp/iris")) diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index cd2db277cc032..e430842e598fe 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -30,7 +30,7 @@ blocks need to be materialized in the cluster memory at once. Reading Data ============ -Dataset uses Ray tasks to read data from remote storage in parallel. Each read task reads one or more files and produces one or more output blocks: +Dataset uses Ray tasks to read data from remote storage in parallel. Each read task reads one or more files and produces an output block: .. image:: images/dataset-read.svg :align: center @@ -38,7 +38,7 @@ Dataset uses Ray tasks to read data from remote storage in parallel. Each read t .. https://docs.google.com/drawings/d/15B4TB8b5xN15Q9S8-s0MjW6iIvo_PrH7JtV1fL123pU/edit -You can increase or decrease the number of output blocks by changing the ``parallelism`` parameter. +You can manually specify the number of read tasks, but the final parallelism is always capped by the number of files in the underlying dataset. For an in-depth guide on creating datasets, read :ref:`Loading Data `. diff --git a/doc/source/data/performance-tips.rst b/doc/source/data/performance-tips.rst index d23eb3cfa334e..21c025108a956 100644 --- a/doc/source/data/performance-tips.rst +++ b/doc/source/data/performance-tips.rst @@ -135,8 +135,7 @@ By default, Ray Data automatically selects the read ``parallelism`` according to 4. The parallelism is truncated to ``min(num_files, parallelism)``. Occasionally, it is advantageous to manually tune the parallelism to optimize the application. This can be done when loading data via the ``parallelism`` parameter. -For example, use ``ray.data.read_parquet(path, parallelism=1000)`` to force up to 1000 read tasks to be created. Note that read tasks can produce multiple output -blocks per file in order to satisfy the requested parallelism. +For example, use ``ray.data.read_parquet(path, parallelism=1000)`` to force up to 1000 read tasks to be created. Tuning Read Resources ~~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/source/data/transforming-data.rst b/doc/source/data/transforming-data.rst index ddae72bbdc320..5ef6205d3b6f8 100644 --- a/doc/source/data/transforming-data.rst +++ b/doc/source/data/transforming-data.rst @@ -335,9 +335,9 @@ Repartitioning data A :class:`~ray.data.dataset.Dataset` operates on a sequence of distributed data :term:`blocks `. If you want to achieve more fine-grained parallelization, -increase the number of blocks by setting a higher ``parallelism`` at read time. +increase the number of blocks. -To change the number of blocks for an existing Dataset, call +To change the number of blocks, call :meth:`Dataset.repartition() `. .. testcode:: diff --git a/python/ray/air/tests/test_legacy_dataset_config.py b/python/ray/air/tests/test_legacy_dataset_config.py index 23344f71b0a61..9d0af345592b1 100644 --- a/python/ray/air/tests/test_legacy_dataset_config.py +++ b/python/ray/air/tests/test_legacy_dataset_config.py @@ -358,7 +358,7 @@ def checker(shard, results): assert len(results[0]) == 5, results assert results[0] != results[1], results stats = shard.stats() - assert "RandomizeBlockOrder:" in stats, stats + assert "RandomizeBlockOrder: 5/5 blocks executed in" in stats, stats ds = ray.data.range(5) test = TestStream( @@ -385,7 +385,7 @@ def checker(shard, results): # we eliminate the ordering in comparison. assert set(results[0]) == set(results[1]), results stats = shard.stats() - assert "RandomizeBlockOrder:" in stats, stats + assert "RandomizeBlockOrder: 5/5 blocks executed" in stats, stats ds = ray.data.range(5) test = TestBatch( @@ -400,7 +400,7 @@ def checker(shard, results): assert len(results[0]) == 5, results assert results[0] != results[1], results stats = shard.stats() - assert "RandomizeBlockOrder:" in stats, stats + assert "RandomizeBlockOrder: 5/5 blocks executed in" in stats, stats ds = ray.data.range(5) test = TestStream( diff --git a/python/ray/data/BUILD b/python/ray/data/BUILD index e98954fa4d20b..90b4b3caf24ee 100644 --- a/python/ray/data/BUILD +++ b/python/ray/data/BUILD @@ -289,14 +289,6 @@ py_test( deps = ["//:ray_lib", ":conftest"], ) -py_test( - name = "test_splitblocks", - size = "medium", - srcs = ["tests/test_splitblocks.py"], - tags = ["team:data", "exclusive"], - deps = ["//:ray_lib", ":conftest"], -) - py_test( name = "test_execution_optimizer", size = "medium", diff --git a/python/ray/data/_internal/block_list.py b/python/ray/data/_internal/block_list.py index 4db4006052811..f90898e385263 100644 --- a/python/ray/data/_internal/block_list.py +++ b/python/ray/data/_internal/block_list.py @@ -32,9 +32,6 @@ def __init__( # Whether the block list is owned by consuming APIs, and if so it can be # eagerly deleted after read by the consumer. self._owned_by_consumer = owned_by_consumer - # This field can be set to indicate the number of estimated output blocks, - # since each read task may produce multiple output blocks after splitting. - self._estimated_num_blocks = None def __repr__(self): return f"BlockList(owned_by_consumer={self._owned_by_consumer})" @@ -220,10 +217,6 @@ def initial_num_blocks(self) -> int: """Returns the number of blocks of this BlockList.""" return self._num_blocks - def estimated_num_blocks(self) -> int: - """Estimate of `executed_num_blocks()`, without triggering actual execution.""" - return self._estimated_num_blocks or self._num_blocks - def executed_num_blocks(self) -> int: """Returns the number of output blocks after execution. diff --git a/python/ray/data/_internal/execution/operators/input_data_buffer.py b/python/ray/data/_internal/execution/operators/input_data_buffer.py index 0c1560b8c124c..41b542d2d614b 100644 --- a/python/ray/data/_internal/execution/operators/input_data_buffer.py +++ b/python/ray/data/_internal/execution/operators/input_data_buffer.py @@ -19,15 +19,12 @@ def __init__( self, input_data: Optional[List[RefBundle]] = None, input_data_factory: Callable[[], List[RefBundle]] = None, - num_output_blocks: Optional[int] = None, ): """Create an InputDataBuffer. Args: input_data: The list of bundles to output from this operator. input_data_factory: The factory to get input data, if input_data is None. - num_output_blocks: The number of output blocks. If not specified, progress - bars total will be set based on num output bundles instead. """ if input_data is not None: assert input_data_factory is None @@ -40,7 +37,6 @@ def __init__( assert input_data_factory is not None self._input_data_factory = input_data_factory self._is_input_initialized = False - self._num_output_blocks = num_output_blocks super().__init__("Input", []) def start(self, options: ExecutionOptions) -> None: @@ -57,7 +53,7 @@ def get_next(self) -> RefBundle: return self._input_data.pop(0) def num_outputs_total(self) -> Optional[int]: - return self._num_output_blocks or self._num_output_bundles + return self._num_outputs def get_stats(self) -> StatsDict: return {} @@ -68,7 +64,7 @@ def add_input(self, refs, input_index) -> None: def _initialize_metadata(self): assert self._input_data is not None and self._is_input_initialized - self._num_output_bundles = len(self._input_data) + self._num_outputs = len(self._input_data) block_metadata = [] for bundle in self._input_data: block_metadata.extend([m for (_, m) in bundle.blocks]) diff --git a/python/ray/data/_internal/logical/operators/read_operator.py b/python/ray/data/_internal/logical/operators/read_operator.py index a1a94b4e7b04c..2755c8b446aa2 100644 --- a/python/ray/data/_internal/logical/operators/read_operator.py +++ b/python/ray/data/_internal/logical/operators/read_operator.py @@ -11,23 +11,8 @@ def __init__( self, datasource: Datasource, read_tasks: List[ReadTask], - estimated_num_blocks: int, ray_remote_args: Optional[Dict[str, Any]] = None, ): - if len(read_tasks) == estimated_num_blocks: - suffix = "" - else: - suffix = f"->SplitBlocks({int(estimated_num_blocks / len(read_tasks))})" - super().__init__(f"Read{datasource.get_name()}{suffix}", None, ray_remote_args) + super().__init__(f"Read{datasource.get_name()}", None, ray_remote_args) self._datasource = datasource - self._estimated_num_blocks = estimated_num_blocks self._read_tasks = read_tasks - - def fusable(self) -> bool: - """Whether this should be fused with downstream operators. - - When we are outputting multiple blocks per read task, we should disable fusion, - as fusion would prevent the blocks from being dispatched to multiple processes - for parallel processing in downstream operators. - """ - return self._estimated_num_blocks == len(self._read_tasks) diff --git a/python/ray/data/_internal/logical/rules/operator_fusion.py b/python/ray/data/_internal/logical/rules/operator_fusion.py index 613eb0b507d25..5ac00a7cdb449 100644 --- a/python/ray/data/_internal/logical/rules/operator_fusion.py +++ b/python/ray/data/_internal/logical/rules/operator_fusion.py @@ -24,7 +24,6 @@ Repartition, ) from ray.data._internal.logical.operators.map_operator import AbstractUDFMap -from ray.data._internal.logical.operators.read_operator import Read from ray.data._internal.stats import StatsDict from ray.data.block import Block @@ -133,9 +132,6 @@ def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool: down_logical_op = self._op_map[down_op] up_logical_op = self._op_map[up_op] - if isinstance(up_logical_op, Read) and not up_logical_op.fusable(): - return False - # If the downstream operator takes no input, it cannot be fused with # the upstream operator. if not down_logical_op._input_dependencies: diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index e61f356afd4a7..669729dcb6e59 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -221,7 +221,7 @@ def get_plan_as_string(self, classname: str) -> str: if dataset_blocks is None: num_blocks = "?" else: - num_blocks = dataset_blocks.estimated_num_blocks() + num_blocks = dataset_blocks.initial_num_blocks() dataset_str = "{}(num_blocks={}, num_rows={}, schema={})".format( classname, num_blocks, count, schema_str ) diff --git a/python/ray/data/_internal/planner/plan_read_op.py b/python/ray/data/_internal/planner/plan_read_op.py index 086d69eeb0142..fb3695f782379 100644 --- a/python/ray/data/_internal/planner/plan_read_op.py +++ b/python/ray/data/_internal/planner/plan_read_op.py @@ -58,9 +58,7 @@ def get_input_data() -> List[RefBundle]: for read_task in read_tasks ] - inputs = InputDataBuffer( - input_data_factory=get_input_data, num_output_blocks=op._estimated_num_blocks - ) + inputs = InputDataBuffer(input_data_factory=get_input_data) def do_read(blocks: Iterator[ReadTask], _: TaskContext) -> Iterator[Block]: for read_task in blocks: diff --git a/python/ray/data/_internal/util.py b/python/ray/data/_internal/util.py index cfe9c16c8d551..10139dc544144 100644 --- a/python/ray/data/_internal/util.py +++ b/python/ray/data/_internal/util.py @@ -90,7 +90,7 @@ def _autodetect_parallelism( ctx: DataContext, reader: Optional["Reader"] = None, avail_cpus: Optional[int] = None, -) -> (int, int, Optional[int]): +) -> (int, int): """Returns parallelism to use and the min safe parallelism to avoid OOMs. This detects parallelism using the following heuristics, applied in order: @@ -111,9 +111,8 @@ def _autodetect_parallelism( avail_cpus: Override avail cpus detection (for testing only). Returns: - Tuple of detected parallelism (only if -1 was specified), the min safe - parallelism (which can be used to generate warnings about large blocks), - and the estimated inmemory size of the dataset. + Tuple of detected parallelism (only if -1 was specified), and the min safe + parallelism (which can be used to generate warnings about large blocks). """ min_safe_parallelism = 1 max_reasonable_parallelism = sys.maxsize @@ -141,7 +140,7 @@ def _autodetect_parallelism( f"estimated_available_cpus={avail_cpus} and " f"estimated_data_size={mem_size}." ) - return parallelism, min_safe_parallelism, mem_size + return parallelism, min_safe_parallelism def _estimate_avail_cpus(cur_pg: Optional["PlacementGroup"]) -> int: diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 8d88bdcfc4731..091d247825b4e 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -480,7 +480,7 @@ def map_batches( >>> ds = ds.map_batches(map_fn_with_large_output) >>> ds MapBatches(map_fn_with_large_output) - +- Dataset(num_blocks=..., num_rows=1, schema={item: int64}) + +- Dataset(num_blocks=1, num_rows=1, schema={item: int64}) Args: @@ -753,7 +753,7 @@ def select_columns( >>> ds MapBatches() +- Dataset( - num_blocks=..., + num_blocks=10, num_rows=10, schema={col1: int64, col2: int64, col3: int64} ) @@ -1698,7 +1698,7 @@ def groupby(self, key: Optional[str]) -> "GroupedData": ... {"A": x % 3, "B": x} for x in range(100)]).groupby( ... "A").count() Aggregate - +- Dataset(num_blocks=..., num_rows=100, schema={A: int64, B: int64}) + +- Dataset(num_blocks=100, num_rows=100, schema={A: int64, B: int64}) Time complexity: O(dataset size * log(dataset size / parallelism)) @@ -3287,7 +3287,7 @@ def to_tf( >>> ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv") >>> ds Dataset( - num_blocks=..., + num_blocks=1, num_rows=150, schema={ sepal length (cm): double, @@ -3318,7 +3318,7 @@ def to_tf( >>> ds Concatenator +- Dataset( - num_blocks=..., + num_blocks=1, num_rows=150, schema={ sepal length (cm): double, diff --git a/python/ray/data/datasource/datasource.py b/python/ray/data/datasource/datasource.py index a174561f9802e..d251009237820 100644 --- a/python/ray/data/datasource/datasource.py +++ b/python/ray/data/datasource/datasource.py @@ -195,7 +195,6 @@ class ReadTask(Callable[[], Iterable[Block]]): def __init__(self, read_fn: Callable[[], Iterable[Block]], metadata: BlockMetadata): self._metadata = metadata self._read_fn = read_fn - self._additional_output_splits = 1 def get_metadata(self) -> BlockMetadata: return self._metadata @@ -212,27 +211,13 @@ def __call__(self) -> Iterable[Block]: if context.block_splitting_enabled: for block in result: - yield from self._do_additional_splits(block) + yield block else: builder = DelegatingBlockBuilder() for block in result: builder.add_block(block) yield builder.build() - def _set_additional_split_factor(self, k: int) -> None: - self._additional_output_splits = k - - def _do_additional_splits(self, block: Block) -> Iterable[Block]: - if self._additional_output_splits > 1: - block = BlockAccessor.for_block(block) - offset = 0 - split_sizes = _splitrange(block.num_rows(), self._additional_output_splits) - for size in split_sizes: - yield block.slice(offset, offset + size, copy=True) - offset += size - else: - yield block - @PublicAPI class RangeDatasource(Datasource): @@ -493,21 +478,3 @@ def make_block(count: int, num_columns: int) -> Block: i += block_size return read_tasks - - -def _splitrange(n, k): - """Calculates array lens of np.array_split(). - - This is the equivalent of - `[len(x) for x in np.array_split(range(n), k)]`. - """ - base = n // k - output = [base] * k - rem = n - sum(output) - for i in range(len(output)): - if rem > 0: - output[i] += 1 - rem -= 1 - assert rem == 0, (rem, output, n, k) - assert sum(output) == n, (output, n, k) - return output diff --git a/python/ray/data/datasource/parquet_datasource.py b/python/ray/data/datasource/parquet_datasource.py index 8b950a9f4bfce..db74bb2722ba1 100644 --- a/python/ray/data/datasource/parquet_datasource.py +++ b/python/ray/data/datasource/parquet_datasource.py @@ -288,25 +288,6 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]: if meta.size_bytes is not None: meta.size_bytes = int(meta.size_bytes * self._encoding_ratio) - - if meta.num_rows and meta.size_bytes: - # Make sure the batches read are small enough to enable yielding of - # output blocks incrementally during the read. - row_size = meta.size_bytes / meta.num_rows - # Make sure the row batch size is small enough that block splitting - # is still effective. - max_parquet_reader_row_batch_size = ( - DataContext.get_current().target_max_block_size // 10 - ) - default_read_batch_size = max( - 1, - min( - PARQUET_READER_ROW_BATCH_SIZE, - max_parquet_reader_row_batch_size // row_size, - ), - ) - else: - default_read_batch_size = PARQUET_READER_ROW_BATCH_SIZE block_udf, reader_args, columns, schema = ( self._block_udf, self._reader_args, @@ -318,7 +299,6 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]: lambda p=serialized_pieces: _read_pieces( block_udf, reader_args, - default_read_batch_size, columns, schema, p, @@ -383,12 +363,7 @@ def _estimate_files_encoding_ratio(self) -> float: def _read_pieces( - block_udf, - reader_args, - default_read_batch_size, - columns, - schema, - serialized_pieces: List[_SerializedPiece], + block_udf, reader_args, columns, schema, serialized_pieces: List[_SerializedPiece] ) -> Iterator["pyarrow.Table"]: # This import is necessary to load the tensor extension type. from ray.data.extensions.tensor_extension import ArrowTensorType # noqa @@ -412,7 +387,7 @@ def _read_pieces( logger.debug(f"Reading {len(pieces)} parquet pieces") use_threads = reader_args.pop("use_threads", False) - batch_size = reader_args.pop("batch_size", default_read_batch_size) + batch_size = reader_args.pop("batch_size", PARQUET_READER_ROW_BATCH_SIZE) for piece in pieces: part = _get_partition_keys(piece.partition_expression) batches = piece.to_batches( diff --git a/python/ray/data/iterator.py b/python/ray/data/iterator.py index 4b63432a6e442..3e80bab15900a 100644 --- a/python/ray/data/iterator.py +++ b/python/ray/data/iterator.py @@ -48,9 +48,9 @@ class DataIterator(abc.ABC): >>> import ray >>> ds = ray.data.range(5) >>> ds - Dataset(num_blocks=..., num_rows=5, schema={id: int64}) + Dataset(num_blocks=5, num_rows=5, schema={id: int64}) >>> ds.iterator() - DataIterator(Dataset(num_blocks=..., num_rows=5, schema={id: int64})) + DataIterator(Dataset(num_blocks=5, num_rows=5, schema={id: int64})) .. tip:: For debugging purposes, use @@ -635,7 +635,7 @@ def to_tf( ... ) >>> it = ds.iterator(); it DataIterator(Dataset( - num_blocks=..., + num_blocks=1, num_rows=150, schema={ sepal length (cm): double, @@ -666,7 +666,7 @@ def to_tf( >>> it DataIterator(Concatenator +- Dataset( - num_blocks=..., + num_blocks=1, num_rows=150, schema={ sepal length (cm): double, diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index d1649c9c601c8..ed11093a2eff2 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -1,6 +1,5 @@ import collections import logging -import math from typing import ( TYPE_CHECKING, Any, @@ -137,7 +136,7 @@ def from_items( if parallelism == 0: raise ValueError(f"parallelism must be -1 or > 0, got: {parallelism}") - detected_parallelism, _, _ = _autodetect_parallelism( + detected_parallelism, _ = _autodetect_parallelism( parallelism, ray.util.get_current_placement_group(), DataContext.get_current(), @@ -194,7 +193,7 @@ def range(n: int, *, parallelism: int = -1) -> Dataset: >>> import ray >>> ds = ray.data.range(10000) # doctest: +SKIP >>> ds # doctest: +SKIP - Dataset(num_blocks=..., num_rows=10000, schema={id: int64}) + Dataset(num_blocks=200, num_rows=10000, schema={id: int64}) >>> ds.map(lambda x: {"id": x["id"] * 2}).take(4) # doctest: +SKIP [{"id": 0}, {"id": 2}, {"id": 4}, {"id": 6}] @@ -318,12 +317,9 @@ def read_datasource( force_local = True if force_local: - ( - requested_parallelism, - min_safe_parallelism, - inmemory_size, - read_tasks, - ) = _get_read_tasks(datasource, ctx, cur_pg, parallelism, local_uri, read_args) + requested_parallelism, min_safe_parallelism, read_tasks = _get_read_tasks( + datasource, ctx, cur_pg, parallelism, local_uri, read_args + ) else: # Prepare read in a remote task at same node. # NOTE: in Ray client mode, this is expected to be run on head node. @@ -336,12 +332,7 @@ def read_datasource( _get_read_tasks, retry_exceptions=False, num_cpus=0 ).options(scheduling_strategy=scheduling_strategy) - ( - requested_parallelism, - min_safe_parallelism, - inmemory_size, - read_tasks, - ) = ray.get( + requested_parallelism, min_safe_parallelism, read_tasks = ray.get( get_read_tasks.remote( datasource, ctx, @@ -352,35 +343,28 @@ def read_datasource( ) ) - # Compute the number of blocks the read will return. If the number of blocks is - # expected to be less than the requested parallelism, boost the number of blocks - # by adding an additional split into `k` pieces to each read task. - if read_tasks: - if inmemory_size: - expected_block_size = inmemory_size / len(read_tasks) - logger.debug(f"Expected block size {expected_block_size}") - size_based_splits = round( - max(1, expected_block_size / ctx.target_max_block_size) - ) - else: - size_based_splits = 1 - logger.debug(f"Size based split factor {size_based_splits}") - estimated_num_blocks = len(read_tasks) * size_based_splits - logger.debug(f"Blocks after size splits {estimated_num_blocks}") - - # Add more output splitting for each read task if needed. - if estimated_num_blocks < requested_parallelism: - k = math.ceil(requested_parallelism / estimated_num_blocks) - logger.info( - f"To satisfy the requested parallelism of {requested_parallelism}, " - f"each read task output will be split into {k} smaller blocks." - ) - for r in read_tasks: - r._set_additional_split_factor(k) - estimated_num_blocks = estimated_num_blocks * k - logger.debug("Estimated num output blocks {estimated_num_blocks}") - else: - estimated_num_blocks = 0 + if read_tasks and len(read_tasks) < min_safe_parallelism * 0.7: + perc = 1 + round((min_safe_parallelism - len(read_tasks)) / len(read_tasks), 1) + logger.warning( + f"{WARN_PREFIX} The blocks of this dataset are estimated to be {perc}x " + "larger than the target block size " + f"of {int(ctx.target_max_block_size / 1024 / 1024)} MiB. This may lead to " + "out-of-memory errors during processing. Consider reducing the size of " + "input files or using `.repartition(n)` to increase the number of " + "dataset blocks." + ) + elif len(read_tasks) < requested_parallelism and ( + len(read_tasks) < ray.available_resources().get("CPU", 1) // 2 + ): + logger.warning( + f"{WARN_PREFIX} The number of blocks in this dataset " + f"({len(read_tasks)}) " + f"limits its parallelism to {len(read_tasks)} concurrent tasks. " + "This is much less than the number " + "of available CPU slots in the cluster. Use `.repartition(n)` to " + "increase the number of " + "dataset blocks." + ) read_stage_name = f"Read{datasource.get_name()}" available_cpu_slots = ray.available_resources().get("CPU", 1) @@ -406,11 +390,10 @@ def read_datasource( ray_remote_args=ray_remote_args, owned_by_consumer=False, ) - block_list._estimated_num_blocks = estimated_num_blocks # TODO(hchen): move _get_read_tasks and related code to the Read physical operator, # after removing LazyBlockList code path. - read_op = Read(datasource, read_tasks, estimated_num_blocks, ray_remote_args) + read_op = Read(datasource, read_tasks, ray_remote_args) logical_plan = LogicalPlan(read_op) return Dataset( @@ -533,7 +516,7 @@ def read_parquet( >>> ray.data.read_parquet("example://iris.parquet", ... schema=pa.schema(fields)) Dataset( - num_blocks=..., + num_blocks=1, num_rows=150, schema={ sepal.length: double, @@ -633,13 +616,13 @@ def read_images( >>> path = "s3://anonymous@air-example-data-2/movie-image-small-filesize-1GB" >>> ds = ray.data.read_images(path) # doctest: +SKIP >>> ds # doctest: +SKIP - Dataset(num_blocks=..., num_rows=41979, schema={image: numpy.ndarray(ndim=3, dtype=uint8)}) + Dataset(num_blocks=200, num_rows=41979, schema={image: numpy.ndarray(ndim=3, dtype=uint8)}) If you need image file paths, set ``include_paths=True``. >>> ds = ray.data.read_images(path, include_paths=True) # doctest: +SKIP >>> ds # doctest: +SKIP - Dataset(num_blocks=..., num_rows=41979, schema={image: numpy.ndarray(ndim=3, dtype=uint8), path: string}) + Dataset(num_blocks=200, num_rows=41979, schema={image: numpy.ndarray(ndim=3, dtype=uint8), path: string}) >>> ds.take(1)[0]["path"] # doctest: +SKIP 'air-example-data-2/movie-image-small-filesize-1GB/0.jpg' @@ -662,7 +645,7 @@ def read_images( >>> partitioning = Partitioning("dir", field_names=["class"], base_dir=root) >>> ds = ray.data.read_images(root, size=(224, 224), partitioning=partitioning) # doctest: +SKIP >>> ds # doctest: +SKIP - Dataset(num_blocks=..., num_rows=94946, schema={image: TensorDtype(shape=(224, 224, 3), dtype=uint8), class: object}) + Dataset(num_blocks=176, num_rows=94946, schema={image: TensorDtype(shape=(224, 224, 3), dtype=uint8), class: object}) Args: paths: A single file/directory path or a list of file/directory paths. @@ -1765,22 +1748,22 @@ def from_huggingface( >>> ray_ds = ray.data.from_huggingface(hf_dataset) >>> ray_ds {'train': MaterializedDataset( - num_blocks=..., + num_blocks=1, num_rows=3257, schema={text: string, label: int64} ), 'test': MaterializedDataset( - num_blocks=..., + num_blocks=1, num_rows=1421, schema={text: string, label: int64} ), 'validation': MaterializedDataset( - num_blocks=..., + num_blocks=1, num_rows=374, schema={text: string, label: int64} )} >>> ray_ds = ray.data.from_huggingface(hf_dataset["train"]) >>> ray_ds MaterializedDataset( - num_blocks=..., + num_blocks=1, num_rows=3257, schema={text: string, label: int64} ) @@ -1848,7 +1831,7 @@ def from_tf( >>> dataset, _ = tfds.load('cifar10', split=["train", "test"]) # doctest: +SKIP >>> ds = ray.data.from_tf(dataset) # doctest: +SKIP >>> ds # doctest: +SKIP - Dataset(num_blocks=..., num_rows=50000, schema={id: binary, image: numpy.ndarray(shape=(32, 32, 3), dtype=uint8), label: int64}) + Dataset(num_blocks=200, num_rows=50000, schema={id: binary, image: numpy.ndarray(shape=(32, 32, 3), dtype=uint8), label: int64}) >>> ds.take(1) # doctest: +SKIP [{'id': b'train_16399', 'image': array([[[143, 96, 70], [141, 96, 72], @@ -1902,7 +1885,7 @@ def from_torch( >>> dataset = datasets.MNIST("data", download=True) # doctest: +SKIP >>> ds = ray.data.from_torch(dataset) # doctest: +SKIP >>> ds # doctest: +SKIP - Dataset(num_blocks=..., num_rows=60000, schema={item: object}) + Dataset(num_blocks=200, num_rows=60000, schema={item: object}) >>> ds.take(1) # doctest: +SKIP {"item": (, 5)} @@ -1922,7 +1905,7 @@ def _get_read_tasks( parallelism: int, local_uri: bool, kwargs: dict, -) -> Tuple[int, int, Optional[int], List[ReadTask]]: +) -> Tuple[int, int, List[ReadTask]]: """Generates read tasks. Args: @@ -1934,20 +1917,19 @@ def _get_read_tasks( Returns: Request parallelism from the datasource, the min safe parallelism to avoid - OOM, the estimated inmemory data size, and list of read tasks generated. + OOM, and the list of read tasks generated. """ kwargs = _unwrap_arrow_serialization_workaround(kwargs) if local_uri: kwargs["local_uri"] = local_uri DataContext._set_current(ctx) reader = ds.create_reader(**kwargs) - requested_parallelism, min_safe_parallelism, mem_size = _autodetect_parallelism( + requested_parallelism, min_safe_parallelism = _autodetect_parallelism( parallelism, cur_pg, DataContext.get_current(), reader ) return ( requested_parallelism, min_safe_parallelism, - mem_size, reader.get_read_tasks(requested_parallelism), ) diff --git a/python/ray/data/tests/test_auto_parallelism.py b/python/ray/data/tests/test_auto_parallelism.py index a163093697146..55355eef24268 100644 --- a/python/ray/data/tests/test_auto_parallelism.py +++ b/python/ray/data/tests/test_auto_parallelism.py @@ -87,7 +87,7 @@ class MockReader: def estimate_inmemory_data_size(self): return data_size - result, _, _ = _autodetect_parallelism( + result, _ = _autodetect_parallelism( parallelism=-1, cur_pg=None, ctx=DataContext.get_current(), diff --git a/python/ray/data/tests/test_consumption.py b/python/ray/data/tests/test_consumption.py index 5e4b9c41bf149..c832033289d82 100644 --- a/python/ray/data/tests/test_consumption.py +++ b/python/ray/data/tests/test_consumption.py @@ -157,7 +157,7 @@ def test_empty_dataset(ray_start_regular_shared): ds = ds.materialize() assert ( str(ds) - == "MaterializedDataset(num_blocks=2, num_rows=0, schema=Unknown schema)" + == "MaterializedDataset(num_blocks=1, num_rows=0, schema=Unknown schema)" ) # Test map on empty dataset. @@ -1468,6 +1468,7 @@ def test_read_write_local_node(ray_start_cluster): def check_dataset_is_local(ds): blocks = ds.get_internal_block_refs() + assert len(blocks) == num_files ray.wait(blocks, num_returns=len(blocks), fetch_local=False) location_data = ray.experimental.get_object_locations(blocks) locations = [] @@ -1487,7 +1488,7 @@ def check_dataset_is_local(ds): check_dataset_is_local(ds) # With fusion. - ds = ray.data.read_parquet(local_path, parallelism=1).map(lambda x: x).materialize() + ds = ray.data.read_parquet(local_path).map(lambda x: x).materialize() check_dataset_is_local(ds) # Write back to local scheme. @@ -1665,7 +1666,7 @@ def test_dataset_plan_as_string(ray_start_cluster): ds = ray.data.read_parquet("example://iris.parquet") assert ds._plan.get_plan_as_string("Dataset") == ( "Dataset(\n" - " num_blocks=8,\n" + " num_blocks=1,\n" " num_rows=150,\n" " schema={\n" " sepal.length: double,\n" @@ -1685,7 +1686,7 @@ def test_dataset_plan_as_string(ray_start_cluster): " +- MapBatches()\n" " +- MapBatches()\n" " +- Dataset(\n" - " num_blocks=8,\n" + " num_blocks=1,\n" " num_rows=150,\n" " schema={\n" " sepal.length: double,\n" diff --git a/python/ray/data/tests/test_csv.py b/python/ray/data/tests/test_csv.py index 2273cb9196fe7..6fe4f74eae527 100644 --- a/python/ray/data/tests/test_csv.py +++ b/python/ray/data/tests/test_csv.py @@ -660,7 +660,7 @@ def keep_expected_partitions(kv_dict): data_path, partition_filter=partition_path_filter, filesystem=fs, - parallelism=6, + parallelism=100, ) assert_base_partitioned_ds(ds, num_input_files=6, num_computed=6) assert ray.get(kept_file_counter.get.remote()) == 6 diff --git a/python/ray/data/tests/test_execution_optimizer.py b/python/ray/data/tests/test_execution_optimizer.py index 58f9ed755b9b9..ab7a61b564c7a 100644 --- a/python/ray/data/tests/test_execution_optimizer.py +++ b/python/ray/data/tests/test_execution_optimizer.py @@ -72,7 +72,7 @@ def _check_usage_record(op_names: List[str], clear_after_check: Optional[bool] = def test_read_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - op = Read(ParquetDatasource(), [], 0) + op = Read(ParquetDatasource(), []) plan = LogicalPlan(op) physical_op = planner.plan(plan).dag @@ -149,7 +149,7 @@ def method(self, x): for udf, expected_name in zip(udf_list, expected_names): op = MapRows( - Read(ParquetDatasource(), [], 0), + Read(ParquetDatasource(), []), udf, ) assert op.name == f"Map({expected_name})" @@ -157,7 +157,7 @@ def method(self, x): def test_map_batches_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), [], 0) + read_op = Read(ParquetDatasource(), []) op = MapBatches( read_op, lambda x: x, @@ -180,7 +180,7 @@ def test_map_batches_e2e(ray_start_regular_shared, enable_optimizer): def test_map_rows_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), [], 0) + read_op = Read(ParquetDatasource(), []) op = MapRows( read_op, lambda x: x, @@ -203,7 +203,7 @@ def test_map_rows_e2e(ray_start_regular_shared, enable_optimizer): def test_filter_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), [], 0) + read_op = Read(ParquetDatasource(), []) op = Filter( read_op, lambda x: x, @@ -226,7 +226,7 @@ def test_filter_e2e(ray_start_regular_shared, enable_optimizer): def test_flat_map(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), [], 0) + read_op = Read(ParquetDatasource(), []) op = FlatMap( read_op, lambda x: x, @@ -285,7 +285,7 @@ def ensure_sample_size_close(dataset, sample_percent=0.5): def test_random_shuffle_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), [], 0) + read_op = Read(ParquetDatasource(), []) op = RandomShuffle( read_op, seed=0, @@ -317,7 +317,7 @@ def test_random_shuffle_e2e( ) def test_repartition_operator(ray_start_regular_shared, enable_optimizer, shuffle): planner = Planner() - read_op = Read(ParquetDatasource(), [], 0) + read_op = Read(ParquetDatasource(), []) op = Repartition(read_op, num_outputs=5, shuffle=shuffle) plan = LogicalPlan(op) physical_op = planner.plan(plan).dag @@ -383,9 +383,9 @@ def _check_repartition_usage_and_stats(ds): @pytest.mark.parametrize("preserve_order", (True, False)) def test_union_operator(ray_start_regular_shared, enable_optimizer, preserve_order): planner = Planner() - read_parquet_op = Read(ParquetDatasource(), [], 0) - read_range_op = Read(RangeDatasource(), [], 0) - read_json_op = Read(JSONDatasource(), [], 0) + read_parquet_op = Read(ParquetDatasource(), []) + read_range_op = Read(RangeDatasource(), []) + read_json_op = Read(JSONDatasource(), []) union_op = Union( read_parquet_op, read_range_op, @@ -451,7 +451,7 @@ def test_union_e2e(ray_start_regular_shared, enable_optimizer, preserve_order): def test_read_map_batches_operator_fusion(ray_start_regular_shared, enable_optimizer): # Test that Read is fused with MapBatches. planner = Planner() - read_op = Read(ParquetDatasource(), [], 0) + read_op = Read(ParquetDatasource(), []) op = MapBatches( read_op, lambda x: x, @@ -471,7 +471,7 @@ def test_read_map_batches_operator_fusion(ray_start_regular_shared, enable_optim def test_read_map_chain_operator_fusion(ray_start_regular_shared, enable_optimizer): # Test that a chain of different map operators are fused. planner = Planner() - read_op = Read(ParquetDatasource(), [], 0) + read_op = Read(ParquetDatasource(), []) op = MapRows(read_op, lambda x: x) op = MapBatches(op, lambda x: x) op = FlatMap(op, lambda x: x) @@ -517,7 +517,6 @@ def test_read_map_batches_operator_fusion_compatible_remote_args( read_op = Read( ParquetDatasource(), [], - 0, # This case is testing fusing the following 2 map_batches operators. # So we add incompatible remote args to the read op to make sure # it doesn't get fused. @@ -566,7 +565,6 @@ def test_read_map_batches_operator_fusion_incompatible_remote_args( read_op = Read( ParquetDatasource(), [], - 0, # This case is testing fusing the following 2 map_batches operators. # So we add incompatible remote args to the read op to make sure # it doesn't get fused. @@ -601,7 +599,7 @@ def test_read_map_batches_operator_fusion_compute_tasks_to_actors( # Test that a task-based map operator is fused into an actor-based map operator when # the former comes before the latter. planner = Planner() - read_op = Read(ParquetDatasource(), [], 0) + read_op = Read(ParquetDatasource(), []) op = MapBatches(read_op, lambda x: x) op = MapBatches(op, lambda x: x, compute=ray.data.ActorPoolStrategy()) logical_plan = LogicalPlan(op) @@ -621,7 +619,7 @@ def test_read_map_batches_operator_fusion_compute_read_to_actors( ): # Test that reads fuse into an actor-based map operator. planner = Planner() - read_op = Read(ParquetDatasource(), [], 0) + read_op = Read(ParquetDatasource(), []) op = MapBatches(read_op, lambda x: x, compute=ray.data.ActorPoolStrategy()) logical_plan = LogicalPlan(op) physical_plan = planner.plan(logical_plan) @@ -640,7 +638,7 @@ def test_read_map_batches_operator_fusion_incompatible_compute( ): # Test that map operators are not fused when compute strategies are incompatible. planner = Planner() - read_op = Read(ParquetDatasource(), [], 0) + read_op = Read(ParquetDatasource(), []) op = MapBatches(read_op, lambda x: x, compute=ray.data.ActorPoolStrategy()) op = MapBatches(op, lambda x: x) logical_plan = LogicalPlan(op) @@ -664,7 +662,7 @@ def test_read_map_batches_operator_fusion_target_block_size( # Test that fusion of map operators merges their block sizes in the expected way # (taking the max). planner = Planner() - read_op = Read(ParquetDatasource(), [], 0) + read_op = Read(ParquetDatasource(), []) op = MapBatches(read_op, lambda x: x, target_block_size=2) op = MapBatches(op, lambda x: x, target_block_size=5) op = MapBatches(op, lambda x: x, target_block_size=3) @@ -866,7 +864,7 @@ def test_write_fusion(ray_start_regular_shared, enable_optimizer, tmp_path): def test_write_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() datasource = ParquetDatasource() - read_op = Read(datasource, [], 0) + read_op = Read(datasource, []) op = Write( read_op, datasource, @@ -882,7 +880,7 @@ def test_write_operator(ray_start_regular_shared, enable_optimizer): def test_sort_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), [], 0) + read_op = Read(ParquetDatasource(), []) op = Sort( read_op, key="col1", @@ -957,7 +955,7 @@ def test_sort_validate_keys( def test_aggregate_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read(ParquetDatasource(), [], 0) + read_op = Read(ParquetDatasource(), []) op = Aggregate( read_op, key="col1", @@ -1027,8 +1025,8 @@ def test_aggregate_validate_keys( def test_zip_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op1 = Read(ParquetDatasource(), [], 0) - read_op2 = Read(ParquetDatasource(), [], 0) + read_op1 = Read(ParquetDatasource(), []) + read_op2 = Read(ParquetDatasource(), []) op = Zip(read_op1, read_op2) plan = LogicalPlan(op) physical_op = planner.plan(plan).dag diff --git a/python/ray/data/tests/test_mongo.py b/python/ray/data/tests/test_mongo.py index 5574946e76d48..bff4bf5192fb9 100644 --- a/python/ray/data/tests/test_mongo.py +++ b/python/ray/data/tests/test_mongo.py @@ -131,7 +131,7 @@ def test_read_write_mongo(ray_start_regular_shared, start_mongo): ) assert str(ds) == ( "Dataset(\n" - " num_blocks=200,\n" + " num_blocks=5,\n" " num_rows=5,\n" " schema={_id: fixed_size_binary[12], float_field: double, " "int_field: int32}\n" @@ -148,7 +148,7 @@ def test_read_write_mongo(ray_start_regular_shared, start_mongo): ) assert str(ds) == ( "Dataset(\n" - " num_blocks=1000,\n" + " num_blocks=5,\n" " num_rows=5,\n" " schema={_id: fixed_size_binary[12], float_field: double, " "int_field: int32}\n" @@ -247,7 +247,7 @@ def test_mongo_datasource(ray_start_regular_shared, start_mongo): ).materialize() assert str(ds) == ( "MaterializedDataset(\n" - " num_blocks=200,\n" + " num_blocks=5,\n" " num_rows=5,\n" " schema={_id: fixed_size_binary[12], float_field: double, " "int_field: int32}\n" @@ -265,7 +265,7 @@ def test_mongo_datasource(ray_start_regular_shared, start_mongo): ) assert str(ds) == ( "Dataset(\n" - " num_blocks=1000,\n" + " num_blocks=5,\n" " num_rows=5,\n" " schema={_id: fixed_size_binary[12], float_field: double, " "int_field: int32}\n" diff --git a/python/ray/data/tests/test_numpy.py b/python/ray/data/tests/test_numpy.py index 995da1f78862a..082f65c24b075 100644 --- a/python/ray/data/tests/test_numpy.py +++ b/python/ray/data/tests/test_numpy.py @@ -130,7 +130,7 @@ def test_numpy_read(ray_start_regular_shared, tmp_path): path = os.path.join(tmp_path, "test_np_dir") os.mkdir(path) np.save(os.path.join(path, "test.npy"), np.expand_dims(np.arange(0, 10), 1)) - ds = ray.data.read_numpy(path, parallelism=1) + ds = ray.data.read_numpy(path) assert str(ds) == ( "Dataset(\n" " num_blocks=1,\n" @@ -146,7 +146,7 @@ def test_numpy_read(ray_start_regular_shared, tmp_path): with open(os.path.join(path, "foo.txt"), "w") as f: f.write("foobar") - ds = ray.data.read_numpy(path, parallelism=1) + ds = ray.data.read_numpy(path) assert ds.num_blocks() == 1 assert ds.count() == 10 assert str(ds) == ( @@ -186,9 +186,7 @@ def test_numpy_read_meta_provider(ray_start_regular_shared, tmp_path): os.mkdir(path) path = os.path.join(path, "test.npy") np.save(path, np.expand_dims(np.arange(0, 10), 1)) - ds = ray.data.read_numpy( - path, meta_provider=FastFileMetadataProvider(), parallelism=1 - ) + ds = ray.data.read_numpy(path, meta_provider=FastFileMetadataProvider()) assert str(ds) == ( "Dataset(\n" " num_blocks=1,\n" diff --git a/python/ray/data/tests/test_parquet.py b/python/ray/data/tests/test_parquet.py index c69ba0e4bd40b..4dfc1a8a1d2e5 100644 --- a/python/ray/data/tests/test_parquet.py +++ b/python/ray/data/tests/test_parquet.py @@ -1045,6 +1045,8 @@ def get_node_id(): # Force reads. blocks = ds.get_internal_block_refs() + assert len(blocks) == 2 + ray.wait(blocks, num_returns=len(blocks), fetch_local=False) location_data = ray.experimental.get_object_locations(blocks) locations = [] diff --git a/python/ray/data/tests/test_randomize_block_order.py b/python/ray/data/tests/test_randomize_block_order.py index 797d60a7bb4ee..79048dd4e1f27 100644 --- a/python/ray/data/tests/test_randomize_block_order.py +++ b/python/ray/data/tests/test_randomize_block_order.py @@ -21,9 +21,7 @@ def test_randomize_blocks_operator(ray_start_regular_shared, enable_optimizer): planner = Planner() - read_op = Read( - datasource=ParquetDatasource(), read_tasks=[], estimated_num_blocks=0 - ) + read_op = Read(datasource=ParquetDatasource(), read_tasks=[]) op = RandomizeBlocks( read_op, seed=0, @@ -38,7 +36,7 @@ def test_randomize_blocks_operator(ray_start_regular_shared, enable_optimizer): def test_randomize_block_order_rule(): - read = Read(datasource=ParquetDatasource(), read_tasks=[], estimated_num_blocks=0) + read = Read(datasource=ParquetDatasource(), read_tasks=[]) operator1 = RandomizeBlocks(input_op=read, seed=None) operator2 = RandomizeBlocks(input_op=operator1, seed=None) operator3 = MapBatches(input_op=operator2, fn=lambda x: x) @@ -61,7 +59,7 @@ def test_randomize_block_order_rule(): def test_randomize_block_order_rule_seed(): - read = Read(datasource=ParquetDatasource(), read_tasks=[], estimated_num_blocks=0) + read = Read(datasource=ParquetDatasource(), read_tasks=[]) operator1 = RandomizeBlocks(input_op=read, seed=None) operator2 = RandomizeBlocks(input_op=operator1, seed=2) operator3 = MapBatches(input_op=operator2, fn=lambda x: x) @@ -88,7 +86,7 @@ def test_randomize_block_order_rule_seed(): def test_randomize_block_order_after_repartition(): - read = Read(datasource=ParquetDatasource(), read_tasks=[], estimated_num_blocks=0) + read = Read(datasource=ParquetDatasource(), read_tasks=[]) operator1 = RandomizeBlocks(input_op=read) operator2 = Repartition(input_op=operator1, num_outputs=1, shuffle=False) operator3 = RandomizeBlocks(input_op=operator2) diff --git a/python/ray/data/tests/test_size_estimation.py b/python/ray/data/tests/test_size_estimation.py index 11245da54c3c7..2b9c61752b8d1 100644 --- a/python/ray/data/tests/test_size_estimation.py +++ b/python/ray/data/tests/test_size_estimation.py @@ -86,7 +86,7 @@ def gen(name): ray.data.range(1000, parallelism=1).map( lambda _: {"out": LARGE_VALUE} ).write_csv(path) - return ray.data.read_csv(path, parallelism=1) + return ray.data.read_csv(path) # 20MiB ctx.target_max_block_size = 20_000_000 @@ -132,7 +132,7 @@ def gen(name): # will only write to one file, even though there are multiple # blocks created by block splitting. ds.write_parquet(path) - return ray.data.read_parquet(path, parallelism=1) + return ray.data.read_parquet(path, parallelism=200) # 20MiB ctx.target_max_block_size = 20_000_000 @@ -143,17 +143,17 @@ def gen(name): ctx.target_max_block_size = 3_000_000 ds2 = gen("out2") nrow = ds2._block_num_rows() - assert 2 < len(nrow) < 5, nrow + assert 3 < len(nrow) < 5, nrow for x in nrow[:-1]: - assert 50000 < x < 95000, (x, nrow) + assert 50000 < x < 75000, (x, nrow) # 1MiB ctx.target_max_block_size = 1_000_000 ds3 = gen("out3") nrow = ds3._block_num_rows() - assert 6 < len(nrow) < 12, nrow + assert 8 < len(nrow) < 12, nrow for x in nrow[:-1]: - assert 20000 < x < 35000, (x, nrow) + assert 20000 < x < 25000, (x, nrow) @pytest.mark.parametrize("use_actors", [False, True]) diff --git a/python/ray/data/tests/test_splitblocks.py b/python/ray/data/tests/test_splitblocks.py deleted file mode 100644 index f5839b8e85de6..0000000000000 --- a/python/ray/data/tests/test_splitblocks.py +++ /dev/null @@ -1,80 +0,0 @@ -import numpy as np -import pytest - -import ray -from ray.data.datasource.datasource import _splitrange -from ray.data.tests.conftest import * # noqa -from ray.tests.conftest import * # noqa - - -def test_splitrange(): - def f(n, k): - assert _splitrange(n, k) == [len(a) for a in np.array_split(range(n), k)] - - f(0, 1) - f(5, 1) - f(5, 3) - f(5, 5) - f(5, 10) - f(50, 1) - f(50, 2) - f(50, 3) - f(50, 4) - f(50, 5) - - -def test_small_file_split(ray_start_10_cpus_shared): - ds = ray.data.read_csv("example://iris.csv", parallelism=1) - assert ds.num_blocks() == 1 - assert ds.materialize().num_blocks() == 1 - assert ds.map_batches(lambda x: x).materialize().num_blocks() == 1 - - ds = ds.map_batches(lambda x: x).materialize() - stats = ds.stats() - assert "Stage 1 ReadCSV->MapBatches" in stats, stats - - ds = ray.data.read_csv("example://iris.csv", parallelism=10) - assert ds.num_blocks() == 1 - assert ds.map_batches(lambda x: x).materialize().num_blocks() == 10 - assert ds.materialize().num_blocks() == 10 - - ds = ray.data.read_csv("example://iris.csv", parallelism=100) - assert ds.num_blocks() == 1 - assert ds.map_batches(lambda x: x).materialize().num_blocks() == 100 - assert ds.materialize().num_blocks() == 100 - - ds = ds.map_batches(lambda x: x).materialize() - stats = ds.stats() - assert "Stage 1 ReadCSV->SplitBlocks(100)" in stats, stats - assert "Stage 2 MapBatches" in stats, stats - - -def test_large_file_additional_split(ray_start_10_cpus_shared, tmp_path): - ctx = ray.data.context.DataContext.get_current() - ctx.target_max_block_size = 10 * 1024 * 1024 - - # ~100MiB of tensor data - ds = ray.data.range_tensor(1000, shape=(10000,)) - ds.repartition(1).write_parquet(tmp_path) - - ds = ray.data.read_parquet(tmp_path, parallelism=1) - assert ds.num_blocks() == 1 - assert 5 < ds.materialize().num_blocks() < 20 # Size-based block split - - ds = ray.data.read_parquet(tmp_path, parallelism=10) - assert ds.num_blocks() == 1 - assert 5 < ds.materialize().num_blocks() < 20 - - ds = ray.data.read_parquet(tmp_path, parallelism=100) - assert ds.num_blocks() == 1 - assert 50 < ds.materialize().num_blocks() < 200 - - ds = ray.data.read_parquet(tmp_path, parallelism=1000) - assert ds.num_blocks() == 1 - assert 500 < ds.materialize().num_blocks() < 2000 - - -if __name__ == "__main__": - import sys - - sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/train/tests/test_batch_predictor.py b/python/ray/train/tests/test_batch_predictor.py index d3f64fcf273ce..c6d8c6e044ef7 100644 --- a/python/ray/train/tests/test_batch_predictor.py +++ b/python/ray/train/tests/test_batch_predictor.py @@ -505,7 +505,7 @@ def test_get_and_set_preprocessor(): test_dataset = ray.data.range(4) output_ds = batch_predictor.predict(test_dataset) - assert sorted(output_ds.to_pandas().to_numpy().squeeze().tolist()) == [ + assert output_ds.to_pandas().to_numpy().squeeze().tolist() == [ 0.0, 2.0, 4.0, @@ -517,7 +517,7 @@ def test_get_and_set_preprocessor(): assert batch_predictor.get_preprocessor() == preprocessor2 output_ds = batch_predictor.predict(test_dataset) - assert sorted(output_ds.to_pandas().to_numpy().squeeze().tolist()) == [ + assert output_ds.to_pandas().to_numpy().squeeze().tolist() == [ 0.0, 4.0, 8.0, @@ -529,7 +529,7 @@ def test_get_and_set_preprocessor(): assert batch_predictor.get_preprocessor() is None output_ds = batch_predictor.predict(test_dataset) - assert sorted(output_ds.to_pandas().to_numpy().squeeze().tolist()) == [ + assert output_ds.to_pandas().to_numpy().squeeze().tolist() == [ 0.0, 2.0, 4.0,