Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] [AIR] Batch size for preprocessor transforms #29229

Closed
amogkam opened this issue Oct 11, 2022 · 29 comments
Closed

[RFC] [AIR] Batch size for preprocessor transforms #29229

amogkam opened this issue Oct 11, 2022 · 29 comments
Assignees
Labels
RFC RFC issues stale The issue is stale. It will be closed within 7 days unless there are further conversation

Comments

@amogkam
Copy link
Contributor

amogkam commented Oct 11, 2022

User workflow

Workflow 1: I am doing GPU prediction. I also have a preprocessor that has high heap memory pressure leading to OOMs with too much parallelism or large batch sizes. I want to either decrease my batch size for preprocessing, or reduce my preprocessing parallelism by requesting more CPUs for each map_batches task.

Workflow 1, variant 1: Same as above, except my GRAM is a lot lower than RAM. So I need to use a smaller batch size for prediction than for preprocessing.

Workflow 2: I am doing training and am passing in a Preprocessor to my Trainer. Same as for prediction- my UDF has high heap memory pressure leading to OOMs with the default batch size. I would like to decrease my batch size to prevent OOMs.

Current State

Prediction:
When doing GPU prediction with CPU preprocessing, these two stages cannot be fused as each operation has different resource requirements. Currently, users can configure prediction (things like batch size, num resources per task/actor), but for preprocessing, the default args for map_batches are used and cannot be configured. This forces a batch size of 4096 which may be large for certain preprocessors.

https://github.com/ray-project/ray/blob/master/python/ray/train/batch_predictor.py#L226

Training:
We call preprocessor.transform which calls dataset.map_batches with the default batch size of 4096. There is no way to configure this.

https://github.com/ray-project/ray/blob/master/python/ray/data/preprocessor.py#L253
https://github.com/ray-project/ray/blob/master/python/ray/data/preprocessor.py#L255

Proposal

Make preprocessing batch size configurable.

Option 1: Configured at higher level Trainer and BatchPredictor

Add a batch_size arg to Preprocessor.transform.

Add configuration options at the Trainer and BatchPredictor level which will forward to Preprocessor.transform.

More concretely:

  1. Add a override_preprocess_batch_size arg to BatchPredictor.predict to use as the arguments for Preprocessing map_batches. If not doing GPU prediction, then this arg is ignored as preprocessing+prediction are fused into a single stage. If not specified, the same batch size will be used for prediction and preprocessing.
  2. For API consistency we also should introduce an equivalent way to configure preprocess batch size for training. We add a transform_batch_size arg to DatasetConfig which will pipe through to Preprocessor.transform

Pros:

  • A different batch size can be used for preprocessing during training vs. preprocessing during prediction. This is useful for independently tuning batch size for each use case as cluster config for training and prediction are often different (and RAM might be different on each).

Cons:

  • API is too clunky. Configured in different places for training vs. prediction vs. standalone preprocessors.
  • Cannot configure batch size independently for each preprocessor in a chained setting

New API after the changes:

Addition of batch_size arg to Preprocessor.transform:

def transform(self, dataset: Dataset, batch_size: int = 4096) -> Dataset:

Addition of preprocess_batch_size arg to BatchPredictor:

 def predict(
        self,
        data: Union[ray.data.Dataset, ray.data.DatasetPipeline],
        *,
        feature_columns: Optional[List[str]] = None,
        keep_columns: Optional[List[str]] = None,
        batch_size: Optional[int] = None,
        preprocess_batch_size: Optional[int] = None,
        min_scoring_workers: int = 1,
        max_scoring_workers: Optional[int] = None,
        num_cpus_per_worker: Optional[int] = None,
        num_gpus_per_worker: Optional[int] = None,
        separate_gpu_stage: bool = True,
        ray_remote_args: Optional[Dict[str, Any]] = None,
        **predict_kwargs,
    ) -> ray.data.Dataset:

Addition of transform_batch_size to DatasetConfig:

fit: Optional[bool] = None
split: Optional[bool] = None
required: Optional[bool] = None
transform: Optional[bool] = None
transform_batch_size: int = 4096
use_stream_api: Optional[bool] = None
stream_window_size: Optional[float] = None
global_shuffle: Optional[bool] = None
randomize_block_order: Optional[bool] = None

Option 2: Tie batch size to the Preprocessor instance

Have batch_size tied to the preprocessor via an arg passed to Preprocessor.__init__. This is already being done for BatchMapper: #29193, but we would need to extend this to all Preprocessors.

Pros:

  • Cleaner API. Batch size is configured for each preprocessor and in only one place.
  • Can configure batch size independently for each preprocessor

Cons:

  • Cannot use a different batch size for training vs. prediction. The available resources and cluster configuration for training vs. prediction may be different, so you might want to tune each independently depending on available RAM.
  • transform_batch_size doesn't make sense for the online serving case. There's batching at the Ray Serve level that's independent of this.

New API after the changes:

No longer make Preprocessor an abstract class. Add an __init__ method that takes in a transform_batch_size to Preprocessor.__init__.

class Preprocessor:
    def __init__(self, transform_batch_size: int = 4096):
        ...
@richardliaw
Copy link
Contributor

(meta: big fan of this format btw).

I wonder if it's possible to configure the preprocessor directly rather than making this batch_predictor a godconfig?

@ericl
Copy link
Contributor

ericl commented Oct 11, 2022

I think my main concern here is having two separate configs. Could we make it by default use the present batch size config for both, and then call the new flag override_preprocess_batch_size to clear up the use case?

Btw, I don't think None is a great default. It may be better to make it mandatory as an argument.

@clarkzinzow
Copy link
Contributor

batch_size=None also has a specific meaning for ds.map_batches(), where batching is disabled, i.e. the underlying entire block is provided as a batch. Two questions here:

  1. How would we map batch_size=None to the Datasets default for ds.map_batches()? We'd essentially need to ensure that it's not passed as a kwarg.
  2. This would disallow the "disable batching" semantics that are currently tied to batch_size=None in Datasets; are we ok with this?

@clarkzinzow
Copy link
Contributor

Also, +1 to @ericl's suggestion of sharing batch_size between the preprocessor and predictor by default, and having preprocessor_batch_size override that sharing. I think we could still have:

        batch_size: Optional[int] = None,
        preprocess_batch_size: Optional[int] = None,

where the docstring for preprocess_batch_size indicates that, if not specified, batch_size will be used (i.e. I don't think we need to name it override_preprocess_batch_size, which is quite verbose). I think that I've seen this "override if given" pattern elsewhere in ML libs.

@amogkam amogkam changed the title [RFC] BatchPredictor preprocessing configurability [RFC] [AIR] Batch size for preprocessors Oct 11, 2022
@amogkam
Copy link
Contributor Author

amogkam commented Oct 11, 2022

@richardliaw good point, added it as an option

@ericl @clarkzinzow removed the section about the defaults. We can make that decision separately in a future RFC. Also renamed to override_preprocess_batch_size

@amogkam
Copy link
Contributor Author

amogkam commented Oct 11, 2022

Updated the issue- can you guys take another look? Thanks!

@amogkam amogkam changed the title [RFC] [AIR] Batch size for preprocessors [RFC] [AIR] Batch size for preprocessor transforms Oct 11, 2022
@Yard1
Copy link
Member

Yard1 commented Oct 11, 2022

This may be too radical, but how about using a context manager to set batch size for all preprocessor operations inside the context?

For 2, can we not just have another argument for training batch size? If not set, it will use the transform batch size. We can determine whether we are in the middle of training by eg. setting a global

@clarkzinzow
Copy link
Contributor

clarkzinzow commented Oct 11, 2022

For API consistency we also should introduce an equivalent way to configure preprocess batch size for training. We add a transform_batch_size arg to DatasetConfig which will pipe through to Preprocessor.transform

@amogkam We need to be able to set batch_size at a per-preprocessor level, since different preprocessors will have different batch size requirements (e.g. differing peak memory utilization in relation to batch size, differing optimal batch size for performance/throughput). That should be added as a con for option (1) and a pro for option (2), and this might end up being table stakes.

@jiaodong
Copy link
Member

jiaodong commented Oct 11, 2022

Vote for option 2) with the following high level assumptions:

  • Each preprocessor should have a default batch_size
  • Different preprocessors can have different optimal batch_size depending on the workload
  • Preprocessor's batch_size is by default preserved across training and prediction
  • Trainer / Predictor act as the consumer of dataset, which have its own requirement of batch_size that does not overlap with preprocessor batch sizes.
  • Consumer (Trainer / predictor) can always get and override its preprocessor batch_size if needed.
  • For optimal performance, it's expected to tune all preprocessor + consumer batch size

Concrete example:

Preprocessor -> A, B
Trainer -> T
Predictor -> P
Checkpoint -> C

E2E Workflow:

  • A() -> B(batch_size=1024) -> T(batch_size=128) -> C
    • This leads to A with batch_size = 4096, B with batch_size = 1024, T with 128 and all persisted in checkpoint
    • Upon checkpoint resume, user T can train with T(batch_size=256) which by default uses A=4096, B=1024 for preprocessing
  • C -> P(batch_size=512)
    • This by default uses A=4096, B=1024, P=512
    • User can optionally fetch A & B to change their batch size to fine tune e2e throughput

@amogkam
Copy link
Contributor Author

amogkam commented Oct 11, 2022

@clarkzinzow yeah that's a good point, updated the rfc

@amogkam
Copy link
Contributor Author

amogkam commented Oct 11, 2022

@jiaodong yeah that's a good point.

So basically seems like Option 2 is the best approach.
To change the batch size for prediction, the user can modify the preprocessor stored in the checkpoint with a new batch size.

The missing piece here is with Option 2 we also need a setter for batch size: preprocessor.set_batch_size()

So user can do something like this

preprocessor = Preprocessor(transform_batch_size=1024)
trainer = Trainer(preprocessor)
trainer.fit(). checkpoint.to_uri(s3_path)

In a separate prediction cluster, change the batch size of the preprocessor for prediction

checkpoint = Checkpoint.from_uri(uri_path)
preprocessor = checkpoint.get_preprocessor()
preprocessor.set_transform_batch_size(256)
predictor = BatchPredictor.from_checkpoint(checkpoint)

@ericl
Copy link
Contributor

ericl commented Oct 11, 2022 via email

@richardliaw
Copy link
Contributor

The per-preprocessor API perhaps is too much boilerplate/configuration complexity.

The main problem with the batch size too small is the h2d right? though not clear if that is going to be the bottleneck necessarily.

@clarkzinzow
Copy link
Contributor

I actually don't think it's a good idea to support per preprocessor batch
size. The implementation complexity of this is very high, especially if it
breaks fusion across preprocessing stages.

@ericl This wouldn't break fusion across preprocessing stages, we can (and already do) support fusing .map_batches() calls with different batch sizes.

@amogkam
Copy link
Contributor Author

amogkam commented Oct 12, 2022

cc @c21 - it seems that stage fusion is supported for multiple map_batches calls with different batch sizes.

@c21
Copy link
Contributor

c21 commented Oct 12, 2022

We don't do map_batches fusion by default, right? We only do it in lazy execution mode.

@amogkam
Copy link
Contributor Author

amogkam commented Oct 12, 2022

Right, but that's true regardless of what the batch size is.

@ericl
Copy link
Contributor

ericl commented Oct 12, 2022 via email

@amogkam
Copy link
Contributor Author

amogkam commented Oct 12, 2022

AIR force enables lazy mode for BatchPredictor, so we can assume it's on.

I don't think this is currently the case...where is this happening in the code?

@ericl
Copy link
Contributor

ericl commented Oct 12, 2022

Ah, I'm mistaken. It's not force enabling lazy mode, but you do effectively get stage fusion when using most preprocessors after a read stage. This is since the read stage is always partially lazy. Hence, as long as you only have a simple preprocessor transform after the read, you effectively have a fully lazy plan / execution.

@ericl
Copy link
Contributor

ericl commented Oct 12, 2022

We should probably just explicitly call .lazy() in BatchPredictor and avoid relying on this.

@clarkzinzow
Copy link
Contributor

We should probably just explicitly call .lazy() in BatchPredictor and avoid relying on this.

Strong +1 to this, and the same goes for when triggering preprocessing before training, especially since at most one preprocess transform (i.e. .map_batches() operation) will be fused into the read stage on an eager dataset; all subsequent preprocessor transforms will be unfused.

@c21
Copy link
Contributor

c21 commented Oct 12, 2022

We should probably just explicitly call .lazy() in BatchPredictor and avoid relying on this.

+1 as well. This will be much cleaner.

@amogkam
Copy link
Contributor Author

amogkam commented Oct 13, 2022

So just to summarize the discussion:

Makes sense to set datasets to lazy by default for BatchPredictor, but seems orthogonal to the API discussion about batch size. Fusion should work regardless of what the batch size is of multiple map batches calls.

Seems the primary pushback to option 2 is that there is too much configuration complexity to have to set it for each preprocessor?

@jiaodong
Copy link
Member

Seems the primary pushback to option 2 is that there is too much configuration complexity to have to set it for each preprocessor?

We can always have a simple default option (like batch size = block size, or batch size = 4096) for all preprocessors without enforcing them right ? I think for 95%+ of the case end user will never need to know or tune it, but for internal developers be able to use this flag can benefit efficiency and benchmarking.

@clarkzinzow clarkzinzow added air RFC RFC issues labels Oct 17, 2022
@clarkzinzow
Copy link
Contributor

I think we need tiers of overridable defaults here, both at a single preprocessor level and at a global level:

  1. Default batch size for all preprocessors that preprocessors or the caller can override.
  2. Default batch size for each preprocessor that caller can override.
  3. API for callers (training, prediction) to override batch sizes.

One question I have is whether (3) should be a single batch size for all preprocessors, or if we should support overriding single preprocessors in a chain at call time.

@amogkam
Copy link
Contributor Author

amogkam commented Oct 18, 2022

@clarkzinzow @ericl and I chatted just now.

The decision is to go with Option 1, and introduce option 2 if needed down the line.

We will change the default batch size at the AIR level to be less than 4096 (probably make it 256 by default). And we will also log a warning if batch size is not specified for training or prediction.

@amogkam amogkam self-assigned this Oct 21, 2022
@krfricke krfricke added triage Needs triage (eg: priority, bug/not-bug, and owning component) and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Jan 18, 2023
@ericl ericl removed their assignment Mar 9, 2023
@stale
Copy link

stale bot commented Aug 12, 2023

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

  • If you'd like to keep the issue open, just leave any comment, and the stale label will be removed!
  • If you'd like to get more attention to the issue, please tag one of Ray's contributors.

You can always ask for help on our discussion forum or Ray's public slack channel.

@stale stale bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Aug 12, 2023
@stale
Copy link

stale bot commented Oct 15, 2023

Hi again! The issue will be closed because there has been no more activity in the 14 days since the last message.

Please feel free to reopen or open a new issue if you'd still like it to be addressed.

Again, you can always ask for help on our discussion forum or Ray's public slack channel.

Thanks again for opening the issue!

@stale stale bot closed this as completed Oct 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
RFC RFC issues stale The issue is stale. It will be closed within 7 days unless there are further conversation
Projects
None yet
Development

No branches or pull requests

9 participants