-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Datasets] Defer first block computation when reading a Datasource with schema information in metadata #34251
Conversation
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
|
||
# Forces a data read. | ||
values = [[s["one"], s["two"]] for s in ds.take_all()] | ||
check_num_computed(ds, 2, 2) | ||
check_num_computed(ds, 2, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we have difference between bulk and streaming?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comment here explains:
ray/python/ray/data/tests/test_dataset_parquet.py
Lines 37 to 40 in 3f9b238
# When streaming executor is on, the _num_computed() is affected only | |
# by the ds.schema() which will still partial read the blocks, but will | |
# not affected by operations like take() as it's executed via streaming | |
# executor. |
@@ -475,11 +479,13 @@ def test_parquet_read_partitioned(ray_start_regular_shared, fs, data_path): | |||
[3, "f"], | |||
[3, "g"], | |||
] | |||
check_num_computed(ds, 2, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
||
# Test column selection. | ||
ds = ray.data.read_parquet(data_path, columns=["one"], filesystem=fs) | ||
values = [s["one"] for s in ds.take()] | ||
assert sorted(values) == [1, 1, 1, 3, 3, 3] | ||
check_num_computed(ds, 2, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
||
# Forces a data read. | ||
values = [[s["one"], s["two"]] for s in ds.take()] | ||
check_num_computed(ds, 2, 2) | ||
check_num_computed(ds, 2, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Merging to master. |
…th schema information in metadata (ray-project#34251) In the current implementation of [ExecutionPlan._get_unified_blocks_schema](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/plan.py#L418), we force execution to compute the first block when given a `LazyBlockList`. However, when creating a Dataset from a datasource which have schema information available before reading (e.g. Parquet), this unnecessarily forces execution, since we already check for metadata in the subsequent [ensure_metadata_for_first_block](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/lazy_block_list.py#L379). Therefore, we can remove `blocks.compute_first_block()`. Signed-off-by: Scott Lee <sjl@anyscale.com> Signed-off-by: elliottower <elliot@elliottower.com>
…th schema information in metadata (ray-project#34251) In the current implementation of [ExecutionPlan._get_unified_blocks_schema](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/plan.py#L418), we force execution to compute the first block when given a `LazyBlockList`. However, when creating a Dataset from a datasource which have schema information available before reading (e.g. Parquet), this unnecessarily forces execution, since we already check for metadata in the subsequent [ensure_metadata_for_first_block](https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/lazy_block_list.py#L379). Therefore, we can remove `blocks.compute_first_block()`. Signed-off-by: Scott Lee <sjl@anyscale.com> Signed-off-by: Jack He <jackhe2345@gmail.com>
Why are these changes needed?
See #33943
Related issue number
Closes #33943
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.