Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow async stream for writing and appending to a dataset #3146

Merged
merged 1 commit into from
Nov 22, 2024

Conversation

HoKim98
Copy link
Contributor

@HoKim98 HoKim98 commented Nov 20, 2024

This PR allows end-users to use SendableRecordBatchStream and Schema directly for writing or appending a dataset.

It's vital to write&append async streams to a dataset.

Related Issues

Partially resolves #1792.

Side-effects

This PR has a side-effect like below.

Changed

  • peek_stream_schema => abstracted by StreamingWriteSource::into_stream_and_schema.

Added

  • StreamingWriteSource
    • StreamingWriteSource::into_stream => Prior reader_to_stream but also supports Streams.
    • StreamingWriteSource::into_stream_and_schema => Prior (reader_to_stream, peek_stream_schema) but also supports Streams.

@github-actions github-actions bot added the enhancement New feature or request label Nov 20, 2024
@codecov-commenter
Copy link

codecov-commenter commented Nov 20, 2024

Codecov Report

Attention: Patch coverage is 58.75000% with 33 lines in your changes missing coverage. Please review.

Project coverage is 77.95%. Comparing base (1d3b204) to head (24f3884).
Report is 6 commits behind head on main.

Files with missing lines Patch % Lines
rust/lance-datafusion/src/utils.rs 54.16% 18 Missing and 4 partials ⚠️
rust/lance/src/dataset/write/insert.rs 69.23% 3 Missing and 1 partial ⚠️
java/core/lance-jni/src/fragment.rs 0.00% 2 Missing ⚠️
rust/lance/src/dataset/fragment/write.rs 66.66% 1 Missing and 1 partial ⚠️
rust/lance/src/dataset/write.rs 0.00% 2 Missing ⚠️
rust/lance/src/dataset/schema_evolution.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3146      +/-   ##
==========================================
- Coverage   77.95%   77.95%   -0.01%     
==========================================
  Files         242      242              
  Lines       81904    82455     +551     
  Branches    81904    82455     +551     
==========================================
+ Hits        63848    64275     +427     
- Misses      14890    14969      +79     
- Partials     3166     3211      +45     
Flag Coverage Δ
unittests 77.95% <58.75%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.


🚨 Try these New Features:

Copy link
Contributor

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening a PR. Seeing as both readers and streams are internally transformed into streams and handled the same way, I'd rather not have separate methods for them. If we could keep just one execute_stream and not add a new execute_reader, that would be my preference.

The only reason I created a separate method for Vec<RecordBatch> is that I later intend for the implementation to be different. (It will be able to split the data up and write in parallel, which you can't do with a stream.)

Could you instead try something like:

pub trait StreamingWriteSource {
   fn try_into_stream(self) -> Result<SendableRecordBatchStream>;
}

impl StreamingWriteSource for SendableRecordBatchStream { ... }
impl streamingWriteSource for Box<dyn RecordBatchReader> { ... }

pub async fn write_stream(
        stream: impl StreamingWriteSource,
        schema: Schema,
        dest: impl Into<WriteDestination<'_>>,
        params: Option<WriteParams>,

Comment on lines 483 to 490
/// Append to existing [Dataset] with a stream of [RecordBatch]s
///
/// Returns void result or Returns [Error]
pub async fn append_stream(
&mut self,
stream: SendableRecordBatchStream,
schema: Schema,
params: Option<WriteParams>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also rather not add these methods. If we keep adding these we'll end up with too many APIs. The InsertBuilder is public and I'd rather we use that. If we want, we could add write_builder(&self) -> WriteBuilder; and I'd be fine with that.

Copy link
Contributor Author

@HoKim98 HoKim98 Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented by abstracting the (stream, peek_schema(stream) as schema) into StreamingWriteSource.

Comment on lines 445 to 446
let (batches, schema) = peek_reader_schema(Box::new(batches)).await?;
let stream = reader_to_stream(batches);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do some important logic related to dictionary arrays in peek_reader_schema. So I think we should take Schema from the user in the stream case. Instead, we should flip this around:

        let stream = reader_to_stream(batches);
        let (stream, schema) = peek_stream_schema(stream).await?;

Then for the stream case we can also use the peek_stream_schema() method.

Copy link
Contributor Author

@HoKim98 HoKim98 Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented the proposed logic by StreamingWriteSource::into_stream_and_schema. People who do not want to get schema can rather use StreamingWriteSource::into_stream, which is the formal reader_to_stream. Of cource, reader_to_stream function would not be removed.

@HoKim98
Copy link
Contributor Author

HoKim98 commented Nov 21, 2024

Applied StreamingWriteSource and minimized the side-effects.

Copy link
Contributor

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making those modifications. :)

Once you fix the code size issue, I am ready to approve.

Comment on lines 62 to 67
pub async fn write(
&self,
reader: impl RecordBatchReader + Send + 'static,
source: impl StreamingWriteSource,
id: Option<u64>,
) -> Result<Fragment> {
let (stream, schema) = self.get_stream_and_schema(Box::new(reader)).await?;
self.write_impl(stream, schema, id).await
let id = id.unwrap_or_default();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please separate this back out into write() and write_impl. Each concrete type passed into write() will generate a new set of code for it. We have them dispatch into write_impl to minimize that size of the code that is duplicated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the new StreamingWriteSource has to implement Self: Sized, I converted this impl into (stream, schema) which is a set of concrete types before entering _impl methods.

@HoKim98
Copy link
Contributor Author

HoKim98 commented Nov 22, 2024

I reverted flattening _impl methods. Please let me know this is suitable!

Copy link
Contributor

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good now. Thanks for working through my feedback. Great work, @HoKim98 !

@wjones127 wjones127 merged commit e12a769 into lancedb:main Nov 22, 2024
26 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request java python
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ergonomics around writing data (creating new datasets and streaming)
3 participants