Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Add option for parallelizing post-collation data batch operations in DataIterator.iter_batches() #36842

Merged
merged 37 commits into from
Jul 7, 2023

Conversation

scottjlee
Copy link
Contributor

@scottjlee scottjlee commented Jun 26, 2023

Why are these changes needed?

Currently, the prefetch_batches arg of Dataset.iter_batches is used to configure the number of preloaded batches on both the CPU and GPU; therefore, in the typical case where there is much more CPU than GPU, this constrains the number of batches to prefetch on the CPU.

This PR adds a separate parameter, _finalize_fn, which allows for a user-defined function that is executed in a separate threadpool, which allows for parallelization of these steps. For example, this could be useful for host to device transfers as the last step in getting a batch; this is the default _finalize_fn used when _collate_fn is not specified. Note that when _collate_fn is provided by the user, they should also handle the host to device transfer themselves outside of _collate_fn in order to maximize performance.

Related issue number

Closes #35305

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Scott Lee added 2 commits June 26, 2023 16:26
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@scottjlee scottjlee marked this pull request as ready for review June 27, 2023 01:09
Scott Lee added 3 commits June 26, 2023 20:18
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@@ -97,8 +98,13 @@ def iter_batches(
process. If set to greater than 0, a separate thread will be used to fetch
the specified amount of formatted batches from blocks. This improves
performance for non-CPU bound UDFs, allowing batch fetching compute and
formatting to be overlapped with the UDF. Defaults to 0 (no prefetching
enabled).
formatting to be overlapped with the UDF. Defaults to 1.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the docs from 0 to 1, based on the current default value of 1 in the param definition.

@@ -34,6 +34,7 @@ def iter_batches(
shuffle_seed: Optional[int] = None,
ensure_copy: bool = False,
prefetch_batches: int = 1,
gpu_prefetch_batches: int = 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be for iter_torch_batches() only?

Copy link
Contributor Author

@scottjlee scottjlee Jun 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataIterator.iter_torch_batches() calls DataIterator.iter_batches(), which calls this iter_batches function in block_batching/iter_batches.py, so I believe we still need to expose this param here.

threadpool will be used to format batches and apply the collate_fn.
Defaults to 1. You can revert back to the old prefetching behavior
that uses `prefetch_blocks` by setting `use_legacy_iter_batches` to
True in the DataContext.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that it's unlikely someone would want this to be >1, I don't think the comment needs to mention the legacy behavior.

@@ -149,7 +154,7 @@ def _async_iter_batches(
stats=stats,
batch_format=batch_format,
collate_fn=collate_fn,
num_threadpool_workers=prefetch_batches,
num_threadpool_workers=gpu_prefetch_batches,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still want full prefetching for the format conversion right? Just not the final GPU loading step.

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jun 27, 2023
Copy link
Contributor

@amogkam amogkam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 changes we would need to make here

  1. As @ericl mentions, we don’t want to change the existing thread pool for formatting+collate_fn but rather use a new thread pool for host to device transfer
  2. We would also need to change our collate_fn API. Currently, the API is such that if the user specifies a collate_fn, they are expected to do the host to device transfer in the collate_fn. This won’t work if we want to parallelize collate_fn and host to device transfer independently

Scott Lee added 2 commits June 28, 2023 14:37
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@scottjlee
Copy link
Contributor Author

scottjlee commented Jun 28, 2023

@amogkam @ericl I tried separating the formatting and collate, as suggested by Amog in (1) above. For (2) changing the collate_fn API to allow independent parallelization from formatting, where would be the best place make this change? Would we would need to call torch.as_tensor(...) over each batch with device specified, after we format, but before we collate?

Copy link
Contributor

@amogkam amogkam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I mean the collate_fn we would want to do in the CPU based threadpool, along with formatting. Only the host to device transfer should happen in the GPU based threadpool.

@ericl
Copy link
Contributor

ericl commented Jun 28, 2023

How about we have a collate_fn and finalize_fn (could be marked internal argument), and say the finalize_fn has concurrency 1 always? We can put things like H2D in the finalize_fn.

I don't think you'd ever need more than 1 prefetch for the H2D load.

@amogkam
Copy link
Contributor

amogkam commented Jun 28, 2023

in that case we don't need gpu_prefetch_batches

Scott Lee added 4 commits June 28, 2023 17:53
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Copy link
Contributor

@amogkam amogkam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm, thanks! Can we add a couple tests?

  1. Test that finalize_fn is not run in more than 1 thread. We can use a threading.Lock to test this.
  2. Test the logic for the different combinations of collate_fn and finalize_fn, and when the defaults are used.

@@ -180,6 +180,32 @@ def collate(
yield CollatedBatch(batch.batch_idx, collated_batch)


def finalize_batches(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm you could call make_async_gen twice in the same function right?

def collate_and_finalize(...):
    collated_iter = make_async_gen(base_iter, num_workers=prefetch_batches)
    finalized_iter = make_async_gen(collated_iter, num_workers=1)
    return finalized_iter

Scott Lee added 5 commits July 3, 2023 13:25
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@scottjlee scottjlee requested a review from amogkam July 4, 2023 00:27
@amogkam amogkam self-assigned this Jul 4, 2023
Copy link
Contributor

@amogkam amogkam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work! just some final comments

"appropriate dtype and device."
"collate_fn cannot be used with dtypes and device."
"You should manually move the output Torch tensors to the"
"desired dtype and device, outside of collate_fn."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"desired dtype and device, outside of collate_fn."
"desired dtype and device outside of collate_fn."

@@ -193,7 +201,7 @@ def _format_in_threadpool(
num_threadpool_workers: The number of threads to use in the threadpool.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add finalize_fn to the docstring

finalized_iter = make_async_gen(
base_iterator=collated_iter,
fn=threadpool_computations_finalize_fn,
num_workers=1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I don't think we need this case at all. We should just be able to call threadpool_computations_finalize_fn(collated_iter) directly in all cases since the whole thing is being run in a separate thread anyways.

Otherwise we would be prefetching 1 extra batch beyond what's specified in prefetch_batches

Scott Lee added 2 commits July 4, 2023 21:53
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@scottjlee scottjlee requested a review from amogkam July 5, 2023 15:57
amogkam added 7 commits July 5, 2023 10:55
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
@amogkam amogkam merged commit e55e1fe into ray-project:master Jul 7, 2023
scottjlee added a commit to scottjlee/ray that referenced this pull request Jul 10, 2023
…ons in `DataIterator.iter_batches()` (ray-project#36842)

Currently, the prefetch_batches arg of Dataset.iter_batches is used to configure the number of preloaded batches on both the CPU and GPU; therefore, in the typical case where there is much more CPU than GPU, this constrains the number of batches to prefetch on the CPU.

This PR adds a separate parameter, _finalize_fn, which allows for a user-defined function that is executed in a separate threadpool, which allows for parallelization of these steps. For example, this could be useful for host to device transfers as the last step in getting a batch; this is the default _finalize_fn used when _collate_fn is not specified. Note that when _collate_fn is provided by the user, they should also handle the host to device transfer themselves outside of _collate_fn in order to maximize performance.

---------

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Co-authored-by: amogkam <amogkamsetty@yahoo.com>
@scottjlee scottjlee mentioned this pull request Jul 10, 2023
8 tasks
scottjlee added a commit to scottjlee/ray that referenced this pull request Jul 10, 2023
…ons in `DataIterator.iter_batches()` (ray-project#36842)

Currently, the prefetch_batches arg of Dataset.iter_batches is used to configure the number of preloaded batches on both the CPU and GPU; therefore, in the typical case where there is much more CPU than GPU, this constrains the number of batches to prefetch on the CPU.

This PR adds a separate parameter, _finalize_fn, which allows for a user-defined function that is executed in a separate threadpool, which allows for parallelization of these steps. For example, this could be useful for host to device transfers as the last step in getting a batch; this is the default _finalize_fn used when _collate_fn is not specified. Note that when _collate_fn is provided by the user, they should also handle the host to device transfer themselves outside of _collate_fn in order to maximize performance.

---------

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Co-authored-by: amogkam <amogkamsetty@yahoo.com>
@scottjlee scottjlee mentioned this pull request Jul 10, 2023
8 tasks
bveeramani pushed a commit that referenced this pull request Jul 12, 2023
…ons in `DataIterator.iter_batches()` (#36842) (#37260)

Currently, the prefetch_batches arg of Dataset.iter_batches is used to configure the number of preloaded batches on both the CPU and GPU; therefore, in the typical case where there is much more CPU than GPU, this constrains the number of batches to prefetch on the CPU.

This PR adds a separate parameter, _finalize_fn, which allows for a user-defined function that is executed in a separate threadpool, which allows for parallelization of these steps. For example, this could be useful for host to device transfers as the last step in getting a batch; this is the default _finalize_fn used when _collate_fn is not specified. Note that when _collate_fn is provided by the user, they should also handle the host to device transfer themselves outside of _collate_fn in order to maximize performance.

---------

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Co-authored-by: amogkam <amogkamsetty@yahoo.com>
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
…ons in `DataIterator.iter_batches()` (ray-project#36842)

Currently, the prefetch_batches arg of Dataset.iter_batches is used to configure the number of preloaded batches on both the CPU and GPU; therefore, in the typical case where there is much more CPU than GPU, this constrains the number of batches to prefetch on the CPU.

This PR adds a separate parameter, _finalize_fn, which allows for a user-defined function that is executed in a separate threadpool, which allows for parallelization of these steps. For example, this could be useful for host to device transfers as the last step in getting a batch; this is the default _finalize_fn used when _collate_fn is not specified. Note that when _collate_fn is provided by the user, they should also handle the host to device transfer themselves outside of _collate_fn in order to maximize performance.

---------

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Co-authored-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests-ok The tagger certifies test failures are unrelated and assumes personal liability.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[data] Set the prefetch depth separately for GPU-preloading in iter_batches
5 participants