Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Async Batch Prefetching Part 1/2 #30190

Closed
wants to merge 18 commits into from

Conversation

amogkam
Copy link
Contributor

@amogkam amogkam commented Nov 10, 2022

Signed-off-by: Amog Kamsetty amogkamsetty@yahoo.com

Part 1 of async batch prefetching. Implements batch fetching in a separate thread for GPU UDFs in map_batches and iter_batches. This allows CPU based batch fetching to be overlapped with the UDF computation.

async_fetch_batch is added as an argument to map_batches and iter_batches. By default, this is set to False.

We do not add it to DatasetContext as this functionality needs to be configured for each map_batches/iter_batches independently and not globally for the entire dataset. This is because the Dataset workflow might contain some transformations that are on GPU and others that are on CPU.

This is part 1 of the PR and only addresses the case where batch size is less than block size. This PR does not fetch batches across block boundaries.

With both parts implemented, we see GPU prediction throughput increase from ~260 images/sec to ~300 images/sec:

No prefetching:

Total images 16232
Times for each stage:  {'read': 15.336565732955933, 'preprocess': 6.303653955459595, 'predict': 62.256098985672}
Throughput for each stage:  {'read': '1058.3855787948612 (img/sec)', 'preprocess': '2575.0144463341717 (img/sec)', 'predict': '260.72947493442746 (img/sec)'}
Total time:  83.89631867408752
Throughput 193.47690407080358 (img/sec)

With prefetching:

Total images 16232
Times for each stage:  {'read': 16.441548347473145, 'preprocess': 5.674700975418091, 'predict': 54.01595449447632}
Throughput for each stage:  {'read': '987.2549505043818 (img/sec)', 'preprocess': '2860.415036900528 (img/sec)', 'predict': '300.5038076603809 (img/sec)'}
Total time:  76.13220381736755
Throughput 213.20806683776962 (img/sec)

Why are these changes needed?

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Amog Kamsetty <amogkamsetty@yahoo.com>
Signed-off-by: Amog Kamsetty <amogkamsetty@yahoo.com>
@jiaodong
Copy link
Member

simple but powerful optimization indeed :)

A few things:

  • Overlapping IO with compute should be common pattern we enable in ray dataset, are there other places that also needs this and what's the best path we should put this pattern for other consumer facing APIs? ex: map_batches and iter_batches
  • It will be great to try out dashboard's profiling for a before & after comparison on the PR summary
    image
    image

amogkam and others added 8 commits November 21, 2022 18:03
…etching

Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@gmail.com>
Signed-off-by: amogkam <amogkamsetty@gmail.com>
Signed-off-by: amogkam <amogkamsetty@gmail.com>
Signed-off-by: amogkam <amogkamsetty@gmail.com>
Signed-off-by: amogkam <amogkamsetty@gmail.com>
@amogkam amogkam changed the title [Draft] [Datasets] Async prefetching for map_batches [Draft] [Datasets] Async prefetching for Batcher Dec 1, 2022
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
@amogkam amogkam force-pushed the dataset-prefetching branch from e5abcc1 to 43751b5 Compare December 12, 2022 21:17
@amogkam amogkam changed the title [Draft] [Datasets] Async prefetching for Batcher [Datasets] Async Batch Prefetching Part 1/2 Dec 12, 2022
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
@amogkam amogkam marked this pull request as ready for review December 13, 2022 01:15
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design question: At the API level, could we consolidate prefetch_blocks and async_fetch_batch with a single prefetch_batches argument that gives the number of batches to asynchronously prefetch?

Motivation

I think this will be a lot cleaner from an end-user perspective, with a single argument controlling prefetching behavior that has clear batch-based semantics. We ultimately care about pipelining upstream batch preparation with downstream batch computation, and both prefetching batches over the network and background batch slicing/concatenation/formatting fall under the former, so having a single batch-based argument controlling how many batches we want to prepare while a single batch is being processed seems like better UX. IMO, the object store transfer prefetching should probably be an implementation detail in facilitating a generic "how many batches to we want to prepare in the background on this worker" API, similar to the semantics of the tf.data prefetching API.

Do We Need Two Knobs?

Since this seems like nicer UX, the only reasons that we'd want to have two knobs (that I can think of) are:

  1. We want to have different levels of prefetching for object store transfers compared to background slicing/concatenation/format conversion.
  2. The prefetching unit for the two args should be different (e.g. batch vs. block).

(1) could be true. If we assume that we're not hitting NIC bandwidth or CPU bottlenecks during deserialization or batch prep, then prefetching object store transfers is ultimately limited by the node's object store capacity, while background slicing/concatenation/format conversion is ultimately limited by the node's worker heap, which are separate resource pools with different capacities. This is further complicated by format conversions leading to memory inflation. E.g. for a Pandas batch format UDF on Arrow blocks executed on a node that has equal object store and worker heap allocations, you could imagine that we want to prefetch 8 batches to the object store, but only do 1 background batch slice and format conversion since the Pandas format conversion results in a DataFrame that's 4x larger than the Arrow data (and the UDF will need to at least allocate an output in the worker heap as well); this inflation could easily happen for boolean data, for example, which is bitpacked in Arrow but is not in Pandas.

However, this may be a non-issue if:

  • the Python worker heap is typically the bottleneck rather than the node's object store (I think this is true for transformations/trainers that don't use a NumPy/Arrow narrow waist), or;
  • we can rely on Ray Core's object pull manager to do proper backpressure on object transfers (this should be true).

The user can set prefetch_batches tailored to whatever eliminates compute resource stalls subject to a Python worker heap cap, and object transfer prefetching will do a best-effort fulfillment of prefetch_batches, subject to Ray Core's backpressure on object transfers when the node's object store pressure gets high. I'm thinking that this might make (1) a non-issue.

(2) is the status quo, but I think that the current object store prefetching API would be better expressed in terms of batches (this has been a long-standing TODO), so I don't think that (2) is a concern.

Semantics and Implementation

For .map_batches(), this prefetch_batches arg would have the existing async_fetch_batch semantics, except that the buffer size is configurable. Down the line, we might have this arg control the number of blocks that are prefetched by actor workers under the actor compute strategy, rather than that being a strategy config.

For .iter_batches(), this prefetch_batches arg would have the existing async_fetch_batch semantics along with exposing the buffer size, and would control how many blocks are prefetched into the object store on the consumer/worker node. Namely, if we know the number of rows in a block from the readily-available block metadata (this should be defined in the common case), we would prefetch N blocks such that N is the smallest number of blocks that satisfies prefetch_batches, i.e. we choose an N such that sum(b.num_rows for b in blocks[:N-1]) < prefetch_batches * batch_size <= sum(b.num_rows for b in blocks[:N]). If the block metadata doesn't have the number of rows in the block (e.g. the dataset is a lazy read of CSV files, for which we don't know the number of rows until the entire files has been read), then we can prefetch one block at a time.

What do y'all think? cc @jiaodong

@ericl
Copy link
Contributor

ericl commented Dec 13, 2022 via email

@amogkam
Copy link
Contributor Author

amogkam commented Dec 13, 2022

Agreed for the most part. The batch/block prefetching consolidation is addressed in a future PR. This PR is just part 1.

But I’m not sure if I agree that we should expose buffer size to the user. For pretty much all practical cases having a larger buffer size I don’t think will increase throughput and will only increase heap memory usage.

@clarkzinzow
Copy link
Contributor

Agreed for the most part. The batch/block prefetching consolidation is addressed in a future PR. This PR is just part 1.

Ah I see, I didn't realize that's what you meant by "prefetching across block boundaries", but that interpretation is obvious in hindsight!

But I’m not sure if I agree that we should expose buffer size to the user. For pretty much all practical cases having a larger buffer size I don’t think will increase throughput and will only increase heap memory usage.

This is essentially saying that the latency of a training step (or forward pass, for inference) on a single batch will never be less than the latency of fetching and preparing a single batch, which probably won't be the case for small batches and/or lightweight models, whose training step (or forward pass) times can be on the order of milliseconds. The object transfer overhead can dominate for small batches + small blocks, and the batch slicing/concatenation/formatting can dominate for lightweight models.

This might not be important right now since background batch prep is currently single-threaded, but I'm imagining that we will eventually expand this "background worker" concept to have threadpool and/or local CPU actor implementations, allowing for preparation of multiple batches in parallel. And if we eventually decide to push more last-meter preprocessing into this local background worker (e.g. per-batch CV transformations) thereby making the prep more expensive, we'll want to make the buffer size configurable, at least at the Datasets layer.

@amogkam
Copy link
Contributor Author

amogkam commented Dec 13, 2022

Discussed offline with @clarkzinzow and @c21.

Decision is to go with a prefetch_batch: int API in map_batches and iter_batches allowing users to configure the buffer size, rather than just a True/False boolean.

In Datasets, this will default to 0.

In AIR components, we will hard-code as 1 for GPU batch prediction and training ingest.

Signed-off-by: amogkam <amogkamsetty@yahoo.com>
@amogkam amogkam requested a review from clarkzinzow December 14, 2022 01:29
…etching

Signed-off-by: amogkam <amogkamsetty@yahoo.com>
return batch


class AsyncBatcher(BatcherInterface):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design note: Rather than implementing the BatcherInterface with async/background batching being an implementation detail, where the AsyncBatcher ends up mostly being a proxy to the base batcher, I'm wondering if it would be cleaner to lift the backgrounding of the batcher out of the abstraction, e.g. where the background thread and queue are defined outside of the batcher, and batcher.next() is called in the background thread. This is a common design pattern of separating the operation (in this case batch preparation) from the execution and operator-to-operator communication model (background thread and queue), and should make it easier to change/add more operations and tweak the execution/communication model.

If we refactor the batch producer (batch_blocks) code to be a chained sequence of Iterable[Data] -> Iterable[Data] generators, I think we can more cleanly separate the data processing code (e.g. prefetching, batching, formatting, etc.) from the execution/orchestration code (serial vs. background thread vs. threadpool vs. actor, and iterator vs. queue); this is similar to the Torch IterDataPipe paradigm, and they essentially had the same motivations as us. We could then have an Iterable[ray.ObjectRef[Block]] -> Iterable[Block] (pre)fetcher, an Iterable[Block] -> Iterable[Batch] batcher, and an Iterable[Batch] -> Iterable[Batch] formatter.

With --> representing a simple iterable feed (no queueing/buffering), the status quo (no batcher background thread) is

producer -(plasma_queue)-> fetcher --> batcher --> formatter --> consumer

where it's single-threaded (except for the Ray Core object store producer, which is multithreaded) and serial. This PR would be changing this to

producer -(plasma_queue)-> fetcher --> [batcher --> formatter] -(queue)-> consumer

where everything in [] is in a background thread. Explicitly representing this in the batch_blocks code would make it easier to tweak this going forward, e.g. moving the fetcher into the background thread as well, doing some of the fetcher --> batcher --> formatter chain in parallel over a threadpool, etc.

What do you think? And is this somewhat what your part 2 PR looks like?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This structuring should also be a lot easier to incorporate into the upcoming execution model overhaul.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this is what part 2 of the pr looks like!

Copy link
Contributor

@clarkzinzow clarkzinzow Dec 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be reasonable to bring this part 1 PR closer to that paradigm? Otherwise we're essentially adding an AsyncBatcher abstraction (which I think we agree is not the design that we'd like now that we're introducing asynchrony here) in this PR only to remove it in the follow-up stacked PR.

I'm a big fan of the "make the change easy, then make the easy change" principle, and I think that the sequence here would be:

  1. Refactor to the chained generators pattern.
  2. Push batching and formatting into a background thread, with formatted batches pushed onto a queue.
  3. Unify the block prefetching with batch prefetching.

Where (1) and (2) can probably be the same PR (i.e. this PR) if the diff is small enough and is pretty easy to review.

Copy link
Contributor Author

@amogkam amogkam Dec 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even in the followup there is an AsyncBatcher abstraction, but the dataflow pattern is still Iterable based.

producer -(plasma_queue)-> fetcher -->AsyncBatcher([batcher --> formatter] -(queue))--> consumer

Basically, AsyncBatcher abstracts the batcher, formatter, and queue part of this flow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you recommending to remove the Batcher concept altogether?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure we can go with the 3 step approach that you mentioned...

but the Batcher class that does Iterable[Block] -> Iterable[Batch] would still be useful so that we don't have a bunch of loose generator functions hanging around.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so I'm contending that the AsyncBatcher abstraction is probably not the design we want since it's a tight coupling of the execution model + the queueing logic between the operators with the implementation of a single (batcher + formatter) operator. That coupling will make it difficult to change or add more operators and tweak the execution/queueing models, to unit test these different pockets of logic, and to reason through the execution/queueing model.

raise RuntimeError(
"Fetching the next batch took longer than "
f"{self.fetch_timeout_s} seconds"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I don't think we should have a timeout defined; the existing semantics should be maintained, where we wait for as long as it takes to prepare the batch. Any reason that we'd want to add a timeout that I'm not thinking of?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really meant as an emergency safety hatch to avoid infinite hanging in case something goes terribly wrong.

The default value is already very high, but I can increase it if you think that would be better.

)

# Prefetch the next batch before returning the current one.
fetch_thread = threading.Thread(target=self._prefetch_next_batch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a single background thread rather than starting a new thread for every prefetched batch? The former is the more common "do work in a background thread" pattern, makes the thread lifecycle very clear (background thread lives until there are no batches left), and avoids the OS-level overhead of starting a thread for each batch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, will make the change

amogkam added a commit that referenced this pull request Jan 10, 2023
As discussed offline with @clarkzinzow (#30190 (comment)), this PR refactors block batching to follow a chained iterators pattern.

This allows for more flexibility, composability, and better testing of components upstream of Iterator[Block] (formatting, shuffling, batching, prefetching).

This PR only does a refactor and adds tests. There are no API or functionality changes in this PR. This PR also consolidates the map_batches and iter_batches codepaths.

Signed-off-by: amogkam <amogkamsetty@yahoo.com>
@amogkam
Copy link
Contributor Author

amogkam commented Jan 10, 2023

Superceded by #31576

@amogkam amogkam closed this Jan 10, 2023
AmeerHajAli pushed a commit that referenced this pull request Jan 12, 2023
As discussed offline with @clarkzinzow (#30190 (comment)), this PR refactors block batching to follow a chained iterators pattern.

This allows for more flexibility, composability, and better testing of components upstream of Iterator[Block] (formatting, shuffling, batching, prefetching).

This PR only does a refactor and adds tests. There are no API or functionality changes in this PR. This PR also consolidates the map_batches and iter_batches codepaths.

Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants