Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Add option for parallelizing post-collation data batch operations in DataIterator.iter_batches() #36842

Merged
merged 37 commits into from
Jul 7, 2023
Merged
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
122986f
add gpu_prefetch_batches param
Jun 26, 2023
4cdc646
Merge branch 'master' into gpu-prefetch-batches-args
Jun 26, 2023
0a24b87
fix test
Jun 27, 2023
b12d0bf
Merge branch 'master' of https://github.com/ray-project/ray into gpu-…
Jun 27, 2023
7980c08
fix test
Jun 27, 2023
702b8f7
separate collate from format
Jun 28, 2023
1845348
Merge branch 'master' into gpu-prefetch-batches-args
Jun 28, 2023
8bca783
initial finalize_fn rework
Jun 29, 2023
30bc885
Merge branch 'master' into gpu-prefetch-batches-args
Jun 29, 2023
68bd137
missing param
Jun 29, 2023
1060bd6
complete
Jun 29, 2023
3fa1af6
Merge branch 'master' into gpu-prefetch-batches-args
Jun 29, 2023
5589a51
update pipeline path
Jun 29, 2023
1165ee2
docs
Jun 29, 2023
ecd64da
Merge branch 'master' into gpu-prefetch-batches-args
Jun 29, 2023
a40e391
tests
Jun 29, 2023
2c7f00c
fix case with no finalize_fn
Jun 30, 2023
34507aa
1/n comments
Jun 30, 2023
131c5db
slim down finalize_fn usage and make private
Jun 30, 2023
0da72f5
Merge branch 'master' into gpu-prefetch-batches-args
Jun 30, 2023
0d8afe5
comments
Jun 30, 2023
f978348
address amog's comments
Jun 30, 2023
e436ae6
Merge branch 'master' into gpu-prefetch-batches-args
Jun 30, 2023
f2abe90
comments
Jul 3, 2023
6acbd42
Merge branch 'master' into gpu-prefetch-batches-args
Jul 3, 2023
10d54fa
additional tests
Jul 3, 2023
7635b91
Merge branch 'master' into gpu-prefetch-batches-args
Jul 3, 2023
8b94c99
add early failure so test doesnt hang until timeout
Jul 3, 2023
37ead20
Merge branch 'master' into gpu-prefetch-batches-args
Jul 5, 2023
30743d6
address amog's comments
Jul 5, 2023
3473d47
update
amogkam Jul 5, 2023
8f40e1d
fix
amogkam Jul 5, 2023
c029f00
fix
amogkam Jul 5, 2023
4e93946
cleanup
amogkam Jul 6, 2023
f91a583
fix
amogkam Jul 6, 2023
62320e4
fix test
amogkam Jul 6, 2023
2121f96
update
amogkam Jul 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add gpu_prefetch_batches param
Signed-off-by: Scott Lee <sjl@anyscale.com>
Scott Lee committed Jun 26, 2023
commit 122986f9e935e0cceef961c89ea880040e2814d9
13 changes: 9 additions & 4 deletions python/ray/data/_internal/block_batching/iter_batches.py
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ def iter_batches(
shuffle_seed: Optional[int] = None,
ensure_copy: bool = False,
prefetch_batches: int = 1,
gpu_prefetch_batches: int = 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be for iter_torch_batches() only?

Copy link
Contributor Author

@scottjlee scottjlee Jun 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataIterator.iter_torch_batches() calls DataIterator.iter_batches(), which calls this iter_batches function in block_batching/iter_batches.py, so I believe we still need to expose this param here.

) -> Iterator[DataBatch]:
"""Create formatted batches of data from an iterator of block object references and
corresponding metadata.
@@ -97,8 +98,13 @@ def iter_batches(
process. If set to greater than 0, a separate thread will be used to fetch
the specified amount of formatted batches from blocks. This improves
performance for non-CPU bound UDFs, allowing batch fetching compute and
formatting to be overlapped with the UDF. Defaults to 0 (no prefetching
enabled).
formatting to be overlapped with the UDF. Defaults to 1.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the docs from 0 to 1, based on the current default value of 1 in the param definition.

gpu_prefetch_batches: The number of batches to fetch ahead of the current
batch to fetch on the GPU. If set to greater than 0, a separate
threadpool will be used to format batches and apply the collate_fn.
Defaults to 1. You can revert back to the old prefetching behavior
that uses `prefetch_blocks` by setting `use_legacy_iter_batches` to
True in the DataContext.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that it's unlikely someone would want this to be >1, I don't think the comment needs to mention the legacy behavior.


Returns:
An iterator over record batches.
@@ -119,7 +125,6 @@ def iter_batches(
def _async_iter_batches(
block_refs: Iterator[Tuple[ObjectRef[Block], BlockMetadata]],
) -> Iterator[DataBatch]:

# Step 1: Prefetch logical batches locally.
block_refs = prefetch_batches_locally(
block_ref_iter=block_refs,
@@ -149,7 +154,7 @@ def _async_iter_batches(
stats=stats,
batch_format=batch_format,
collate_fn=collate_fn,
num_threadpool_workers=prefetch_batches,
num_threadpool_workers=gpu_prefetch_batches,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still want full prefetching for the format conversion right? Just not the final GPU loading step.

)

# Step 5: Restore original order.
15 changes: 11 additions & 4 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
@@ -2957,6 +2957,7 @@ def iter_batches(
self,
*,
prefetch_batches: int = 1,
gpu_prefetch_batches: int = 1,
batch_size: Optional[int] = 256,
batch_format: Optional[str] = "default",
drop_last: bool = False,
@@ -2978,10 +2979,15 @@ def iter_batches(
Args:
prefetch_batches: The number of batches to fetch ahead of the current batch
to fetch. If set to greater than 0, a separate threadpool will be used
to fetch the objects to the local node, format the batches, and apply
the collate_fn. Defaults to 1. You can revert back to the old
prefetching behavior that uses `prefetch_blocks` by setting
`use_legacy_iter_batches` to True in the datasetContext.
to fetch the objects to the local node. Defaults to 1. You can revert
back to the old prefetching behavior that uses `prefetch_blocks` by
setting `use_legacy_iter_batches` to True in the DataContext.
gpu_prefetch_batches: The number of batches to fetch ahead of the current
batch to fetch on the GPU. If set to greater than 0, a separate
threadpool will be used to format batches and apply the collate_fn.
Defaults to 1. You can revert back to the old prefetching behavior
that uses `prefetch_blocks` by setting `use_legacy_iter_batches` to
True in the DataContext.
batch_size: The number of rows in each batch, or None to use entire blocks
as batches (blocks may contain different number of rows).
The final batch may include fewer than ``batch_size`` rows if
@@ -3007,6 +3013,7 @@ def iter_batches(
logger.warning("The 'native' batch format has been renamed 'default'.")
return self.iterator().iter_batches(
prefetch_batches=prefetch_batches,
gpu_prefetch_batches=gpu_prefetch_batches,
prefetch_blocks=prefetch_blocks,
batch_size=batch_size,
batch_format=batch_format,
16 changes: 11 additions & 5 deletions python/ray/data/iterator.py
Original file line number Diff line number Diff line change
@@ -88,6 +88,7 @@ def iter_batches(
self,
*,
prefetch_batches: int = 1,
gpu_prefetch_batches: int = 1,
batch_size: int = 256,
batch_format: Optional[str] = "default",
drop_last: bool = False,
@@ -111,10 +112,15 @@ def iter_batches(
Args:
prefetch_batches: The number of batches to fetch ahead of the current batch
to fetch. If set to greater than 0, a separate threadpool will be used
to fetch the objects to the local node, format the batches, and apply
the collate_fn. Defaults to 1. You can revert back to the old
prefetching behavior that uses `prefetch_blocks` by setting
`use_legacy_iter_batches` to True in the DataContext.
to fetch the objects to the local node. Defaults to 1. You can revert
back to the old prefetching behavior that uses `prefetch_blocks` by
setting `use_legacy_iter_batches` to True in the DataContext.
gpu_prefetch_batches: The number of batches to fetch ahead of the current
batch to fetch on the GPU. If set to greater than 0, a separate
threadpool will be used to format batches and apply the collate_fn.
Defaults to 1. You can revert back to the old prefetching behavior
that uses `prefetch_blocks` by setting `use_legacy_iter_batches` to
True in the DataContext.
batch_size: The number of rows in each batch, or None to use entire blocks
as batches (blocks may contain different number of rows).
The final batch may include fewer than ``batch_size`` rows if
@@ -187,6 +193,7 @@ def drop_metadata(block_iterator):
shuffle_buffer_min_size=local_shuffle_buffer_size,
shuffle_seed=local_shuffle_seed,
prefetch_batches=prefetch_batches,
gpu_prefetch_batches=gpu_prefetch_batches,
)

if stats:
@@ -320,7 +327,6 @@ def iter_torch_batches(
)

if collate_fn is None:

# Automatically move torch tensors to the appropriate device.
if device is None:
default_device = get_device()
16 changes: 13 additions & 3 deletions python/ray/data/tests/block_batching/test_iter_batches.py
Original file line number Diff line number Diff line change
@@ -143,7 +143,8 @@ def collate_fn(batch: pd.DataFrame):
assert concat_df["foo"].iloc[i + 1] >= concat_df["foo"].iloc[i]


def test_iter_batches_e2e_async(ray_start_regular_shared):
@pytest.mark.parametrize("gpu_prefetch_batches", [1, 2, 4])
def test_iter_batches_e2e_async(ray_start_regular_shared, gpu_prefetch_batches):
"""We add time.sleep in 3 places:
1. In the base generator to simulate streaming executor blocking on next results.
2. In the collate_fn to simulate expensive slicing/formatting/collation
@@ -160,7 +161,11 @@ def collate_fn(batch):
)
start_time = time.time()
output_batches = iter_batches(
block_refs_iter, batch_size=None, collate_fn=collate_fn, prefetch_batches=4
block_refs_iter,
batch_size=None,
collate_fn=collate_fn,
prefetch_batches=4,
gpu_prefetch_batches=gpu_prefetch_batches,
)
batches = []
for batch in output_batches:
@@ -171,7 +176,12 @@ def collate_fn(batch):
# 20 batches, 1.5 second sleep. Should be less than 45 seconds, even with some
# overhead.
# If there was no overlap, then we would expect this to take at least 20*2.5 = 50
assert end_time - start_time < 45, end_time - start_time
if gpu_prefetch_batches == 1:
assert end_time - start_time < 45, end_time - start_time
elif gpu_prefetch_batches == 2:
assert end_time - start_time < 40, end_time - start_time
elif gpu_prefetch_batches == 4:
assert end_time - start_time < 35, end_time - start_time

assert len(batches) == 20
assert all(len(batch) == 2 for batch in batches)