Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] [streaming] Support a streaming_repartition() operator #36724

Closed
ericl opened this issue Jun 22, 2023 · 5 comments
Closed

[data] [streaming] Support a streaming_repartition() operator #36724

ericl opened this issue Jun 22, 2023 · 5 comments
Assignees
Labels
data Ray Data-related issues enhancement Request for new feature and/or capability P1 Issue that should be fixed within a few weeks

Comments

@ericl
Copy link
Contributor

ericl commented Jun 22, 2023

In several use cases, it is useful to change the block size of datasets in a streaming way. The current repartition() operator is an all-to-all operator and is incompatible with streaming.

We could implement a general purpose streaming_repartition() operator that supports repartitioning in a few streaming-compatible ways:

  • Splitting/coalescing blocks into a certain number of rows
  • Splitting/coalescing blocks into a certain in-memory byte size
  • Splitting/coalescing blocks into K pieces

This could be implemented as a new PhysicalOperator that implements the online repartitioning. This could also replace the current SplitBlocks mechanism from #36352

@ericl ericl added enhancement Request for new feature and/or capability P1 Issue that should be fixed within a few weeks data Ray Data-related issues labels Jun 22, 2023
@luxunxiansheng
Copy link

Suppose I have 20 big size files and I implement a specfic datasource for it. I would like to load the datset by read_datasource with a parallesim ,say , 200. Now I see the splitblocks function to split each block to many smaller blocks. My question is , how does the splitblocks work? It will split a single big file in each row into many many binary parts and then to coalesce them somewhere in the downstream?

@ericl
Copy link
Contributor Author

ericl commented Oct 5, 2023

SplitBlocks works within the read task to split the read output into multiple smaller pieces. These will remain as smaller individual blocks for the remainder of the computation unless the dataset is explicitly repartitioned.

Ray Data will automatically insert SplitBlocks to ensure the desired/autodetected parallelism is met after a read.

@anyscalesam anyscalesam added P2 Important issue, but not time-critical and removed P1 Issue that should be fixed within a few weeks labels Nov 8, 2023
@alexeykudinkin alexeykudinkin self-assigned this Nov 22, 2024
@alexeykudinkin alexeykudinkin added P1 Issue that should be fixed within a few weeks and removed P2 Important issue, but not time-critical labels Nov 22, 2024
@anrooo
Copy link

anrooo commented Dec 20, 2024

Also been looking for this feature. We have pipelines where processing single rows can take many minutes where we need greater control over block size at various parts of our pipelines.

raulchen pushed a commit that referenced this issue Feb 11, 2025
## Why are these changes needed?
Add repartition by target number of rows per block
Addresses #36724

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: srinathk10 <68668616+srinathk10@users.noreply.github.com>
@drcege
Copy link

drcege commented Mar 3, 2025

@srinathk10 Why is this closed; does #50179 provide a true streaming split? I mean, without the need to materialize the whole dataset.

Because the documentation still says this operation requires all inputs to be materialized in object store for it to execute, could you please clarify what the exact meaning of all inputs here refers to: the entire dataset or only the input of the current node .

Can I understand that in the case of shuffle=False, this is a streaming split (the whole dataset is not materialized)?

xsuler pushed a commit to antgroup/ant-ray that referenced this issue Mar 4, 2025
## Why are these changes needed?
Add repartition by target number of rows per block
Addresses ray-project#36724

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: srinathk10 <68668616+srinathk10@users.noreply.github.com>
@srinathk10
Copy link
Contributor

https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.repartition.html
@drcege Thanks for pointing out inconsistency in the documentation. I will put out a patch for this one.

Yes, #50179 does handle streaming repartition when target_num_rows_per_block is set.

xsuler pushed a commit to antgroup/ant-ray that referenced this issue Mar 4, 2025
## Why are these changes needed?
Add repartition by target number of rows per block
Addresses ray-project#36724

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: srinathk10 <68668616+srinathk10@users.noreply.github.com>
park12sj pushed a commit to park12sj/ray that referenced this issue Mar 18, 2025
## Why are these changes needed?
Add repartition by target number of rows per block
Addresses ray-project#36724

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: srinathk10 <68668616+srinathk10@users.noreply.github.com>
jaychia pushed a commit to jaychia/ray that referenced this issue Mar 19, 2025
## Why are these changes needed?
Add repartition by target number of rows per block
Addresses ray-project#36724

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: srinathk10 <68668616+srinathk10@users.noreply.github.com>
Signed-off-by: Jay Chia <17691182+jaychia@users.noreply.github.com>
jaychia pushed a commit to jaychia/ray that referenced this issue Mar 19, 2025
## Why are these changes needed?
Add repartition by target number of rows per block
Addresses ray-project#36724

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: srinathk10 <68668616+srinathk10@users.noreply.github.com>
Signed-off-by: Jay Chia <17691182+jaychia@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data Ray Data-related issues enhancement Request for new feature and/or capability P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

No branches or pull requests

7 participants