Skip to content

Commit

Permalink
refine input of writer
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Feb 23, 2025
1 parent 9d76521 commit 797379e
Showing 1 changed file with 84 additions and 52 deletions.
136 changes: 84 additions & 52 deletions crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// under the License.

//! Position delete file writer.
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use arrow_array::builder::{PrimitiveBuilder, StringBuilder};
Expand All @@ -29,17 +31,21 @@ use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
use crate::{Error, ErrorKind, Result};

const POS_DELETE_FIELD1_NAME: &str = "file_path";
const POS_DELETE_FIELD1_ID: i32 = 2147483546;
const POS_DELETE_FIELD2_NAME: &str = "pos";
const POS_DELETE_FIELD2_ID: i32 = 2147483545;
static POSITION_DELETE_SCHEMA: Lazy<Schema> = Lazy::new(|| {
Schema::builder()
.with_fields(vec![
Arc::new(NestedField::required(
2147483546,
"file_path",
POS_DELETE_FIELD1_ID,
POS_DELETE_FIELD1_NAME,
Type::Primitive(PrimitiveType::String),
)),
Arc::new(NestedField::required(
2147483545,
"pos",
POS_DELETE_FIELD2_ID,
POS_DELETE_FIELD2_NAME,
Type::Primitive(PrimitiveType::Long),
)),
])
Expand All @@ -49,17 +55,17 @@ static POSITION_DELETE_SCHEMA: Lazy<Schema> = Lazy::new(|| {

/// Position delete input.
#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Debug)]
pub struct PositionDeleteInput {
pub struct PositionDeleteInput<'a> {
/// The path of the file.
pub path: String,
/// The offset of the position delete.
pub offsets: Vec<i64>,
pub path: &'a str,
/// The row number in data file
pub pos: i64,
}

impl PositionDeleteInput {
impl<'a> PositionDeleteInput<'a> {
/// Create a new `PositionDeleteInput`.
pub fn new(path: String, offsets: Vec<i64>) -> Self {
PositionDeleteInput { path, offsets }
pub fn new(path: &'a str, row: i64) -> Self {
Self { path, pos: row }
}
}
/// Builder for `MemoryPositionDeleteWriter`.
Expand All @@ -80,7 +86,7 @@ impl<B: FileWriterBuilder> PositionDeleteWriterBuilder<B> {
}

#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriterBuilder<Vec<PositionDeleteInput>>
impl<'a, B: FileWriterBuilder> IcebergWriterBuilder<Vec<PositionDeleteInput<'a>>>
for PositionDeleteWriterBuilder<B>
{
type R = PositionDeleteWriter<B>;
Expand All @@ -99,16 +105,22 @@ pub struct PositionDeleteWriter<B: FileWriterBuilder> {
partition_value: Struct,
}

#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriter<B> {
async fn write(&mut self, inputs: Vec<PositionDeleteInput>) -> Result<()> {
impl<'a, B: FileWriterBuilder> IcebergWriter<Vec<PositionDeleteInput<'a>>>
for PositionDeleteWriter<B>
{
fn write<'life0, 'async_trait>(
&'life0 mut self,
input: Vec<PositionDeleteInput<'a>>,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where
'life0: 'async_trait,
Self: 'async_trait,
{
let mut path_column_builder = StringBuilder::new();
let mut offset_column_builder = PrimitiveBuilder::<Int64Type>::new();
for pd_input in inputs.into_iter() {
for offset in pd_input.offsets {
path_column_builder.append_value(&pd_input.path);
offset_column_builder.append_value(offset);
}
for pd_input in input.into_iter() {
path_column_builder.append_value(pd_input.path);
offset_column_builder.append_value(pd_input.pos);
}
let record_batch = RecordBatch::try_new(
Arc::new(schema_to_arrow_schema(&POSITION_DELETE_SCHEMA).unwrap()),
Expand All @@ -117,28 +129,38 @@ impl<B: FileWriterBuilder> IcebergWriter<Vec<PositionDeleteInput>> for PositionD
Arc::new(offset_column_builder.finish()),
],
)
.map_err(|e| Error::new(ErrorKind::DataInvalid, e.to_string()))?;
.map_err(|e| Error::new(ErrorKind::DataInvalid, e.to_string()));

if let Some(inner_writer) = &mut self.inner_writer {
inner_writer.write(&record_batch).await?;
} else {
return Err(Error::new(ErrorKind::Unexpected, "write has been closed"));
}
Ok(())
Box::pin(async move {
if let Some(inner_writer) = &mut self.inner_writer {
inner_writer.write(&record_batch?).await?;
} else {
return Err(Error::new(ErrorKind::Unexpected, "write has been closed"));
}
Ok(())
})
}

async fn close(&mut self) -> Result<Vec<DataFile>> {
let writer = self.inner_writer.take().unwrap();
Ok(writer
.close()
.await?
.into_iter()
.map(|mut res| {
res.content(DataContentType::PositionDeletes);
res.partition(self.partition_value.clone());
res.build().expect("Guaranteed to be valid")
})
.collect())
fn close<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<Vec<DataFile>>> + Send + 'async_trait>>
where
'life0: 'async_trait,
Self: 'async_trait,
{
Box::pin(async move {
let writer = self.inner_writer.take().unwrap();
Ok(writer
.close()
.await?
.into_iter()
.map(|mut res| {
res.content(DataContentType::PositionDeletes);
res.partition(self.partition_value.clone());
res.build().expect("Guaranteed to be valid")
})
.collect())
})
}
}

Expand Down Expand Up @@ -184,28 +206,38 @@ mod test {
// Write some position delete inputs
let inputs: Vec<PositionDeleteInput> = vec![
PositionDeleteInput {
path: "file2.parquet".to_string(),
offsets: vec![2, 1, 3],
path: "file2.parquet",
pos: 2,
},
PositionDeleteInput {
path: "file2.parquet",
pos: 1,
},
PositionDeleteInput {
path: "file2.parquet",
pos: 3,
},
PositionDeleteInput {
path: "file3.parquet",
pos: 2,
},
PositionDeleteInput {
path: "file1.parquet",
pos: 5,
},
PositionDeleteInput {
path: "file3.parquet".to_string(),
offsets: vec![2],
path: "file1.parquet",
pos: 4,
},
PositionDeleteInput {
path: "file1.parquet".to_string(),
offsets: vec![5, 4, 1],
path: "file1.parquet",
pos: 1,
},
];
let expect_inputs = inputs
.clone()
.into_iter()
.flat_map(|input| {
input
.offsets
.iter()
.map(|off| (input.path.clone(), *off))
.collect::<Vec<_>>()
})
.map(|input| (input.path.to_string(), input.pos))
.collect_vec();
position_delete_writer.write(inputs.clone()).await?;

Expand Down

0 comments on commit 797379e

Please sign in to comment.