-
Notifications
You must be signed in to change notification settings - Fork 205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support position delete writer #704
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two kinds of writers in iceberg:
- Plain position delete writer: https://github.com/apache/iceberg/blob/da2ad389fd9ba8222f6fb3f57922209c239a7045/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java#L49
- Sorting position delete writer:
https://github.com/apache/iceberg/blob/da2ad389fd9ba8222f6fb3f57922209c239a7045/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java#L49
It seems that this pr tries to implement 2, while there are some missing part there. I would suggest to implement 1 first as it's easier, what do you think?
crates/iceberg/src/arrow/schema.rs
Outdated
@@ -607,6 +607,19 @@ impl SchemaVisitor for ToArrowSchemaConverter { | |||
} | |||
} | |||
|
|||
/// Convert iceberg field to an arrow field. | |||
pub fn field_to_arrow_field(field: &crate::spec::NestedFieldRef) -> Result<FieldRef> { | |||
let mut converter = ToArrowSchemaConverter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation is a little hack to me. How about just create a one field schema, and convert it using arrow schema, then get the result?
fn write<'life0, 'async_trait>( | ||
&'life0 mut self, | ||
input: PositionDeleteInput<'a>, | ||
) -> ::core::pin::Pin< | ||
Box<dyn ::core::future::Future<Output = Result<()>> + ::core::marker::Send + 'async_trait>, | ||
> | ||
where | ||
'life0: 'async_trait, | ||
Self: 'async_trait, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove these auto generated lifetime markers and prefix of types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For here we use a sync version so that seems we need to explicitly declare these auto-generated lifetime.
The reason we need the sync version is that the input takes the reference like: struct PositionDeleteInput<'a>
, we need to explicitly convert it into a record batch in the sync function part and then return a async future to write this record batch.
} | ||
|
||
/// The memory position delete writer. | ||
pub struct MemoryPositionDeleteWriter<B: FileWriterBuilder> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub struct MemoryPositionDeleteWriter<B: FileWriterBuilder> { | |
pub struct PositionDeleteWriter<B: FileWriterBuilder> { |
I don't think we should add a Memory
prefix here since it make people feel that we are storing everything in memory, and it applies to all structs.
Is position delete must be sorted or it just be optional? From the iceberg spec, it looks like it must be sorted. https://iceberg.apache.org/spec/#position-delete-files:~:text=The%20rows%20in%20the%20delete%20file%20must%20be%20sorted%20by%20file_path%20then%20pos%20to%20optimize%20filtering%20rows%20while%20scanning. |
Make sense. Let's implement 1 first |
I think we can resolve #741 first before this PR. |
@ZENOTME Are you still working on this? I'm looking to work on one of the two writers |
Sorry for the late, I will work on this later. Would you like to work on sorting position delete writer after this PR? |
Hi @liurenjie1024 @jonathanc-n. I have fixed this PR. It's ready for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall lgtm! I can get started on the sorting position delete writer next after merge
async fn write(&mut self, input: Vec<PositionDeleteInput>) -> Result<()> { | ||
let mut path_column_builder = StringBuilder::new(); | ||
let mut offset_column_builder = PrimitiveBuilder::<Int64Type>::new(); | ||
for input in input.into_iter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change variable here? ex. pd_input
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ZENOTME for this pr, generally LGTM, left some minor suggestions.
2147483546, | ||
"file_path", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make these constants.
2147483545, | ||
"pos", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
/// The offset of the position delete. | ||
pub offsets: Vec<i64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// The offset of the position delete. | |
pub offsets: Vec<i64>, | |
/// The row number in data file.. | |
pub row: i64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not ask user to think about the container.
#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Debug)] | ||
pub struct PositionDeleteInput { | ||
/// The path of the file. | ||
pub path: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub path: String, | |
pub path: &'a str, |
} | ||
|
||
/// Position delete writer. | ||
pub struct PositionDeleteWriter<B: FileWriterBuilder> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should buffer in memory about for the input row number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean that for PositionDeleteInput
, we should buffer them and write them as a batch?🤔 E.g.
pub struct PositionDeleteWriter {
// path -> row_num
buffer: HashMap<String, Vec<i64>>
}
For here I don't add the buffer because we will add SortPositionDeleteWriter later and it will buffer the input and sort them, so I'm not sure whether we need to add a buffer here. Or we can let it be a optional choice?
partition_value: Struct, | ||
} | ||
|
||
impl<'a, B: FileWriterBuilder> IcebergWriter<Vec<PositionDeleteInput<'a>>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't simply this code using #[async_trait]
because PositionDeleteInput take the reference, so in here we should convert them into RecordBatch first in sync code and then return a async function to write them. cc @liurenjie1024
797379e
to
391a061
Compare
#704 fail in msrv check and I find that's because `cargo update faststr` will update the munge to `0.4.2` instead of `0.4.1`. The simple fix way is to specify the precise version of munge. But I'm not sure whether it's good practice here. Do you have any suggestions for this? cc @Xuanwo @xxchan Co-authored-by: ZENOTME <st810918843@gmail.com>
Complete #340