From 1d225af533e965b7de807ea2fe08e74da45ed970 Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Fri, 7 Mar 2025 14:06:51 +0800 Subject: [PATCH 1/7] feat: add file writer option for python sdk --- python/python/lance/dataset.py | 5 ++++ python/python/lance/fragment.py | 6 +++- python/src/dataset.rs | 30 ++++++++++++++++++- .../src/encodings/physical/value.rs | 3 +- rust/lance/src/dataset/fragment/write.rs | 11 +++---- rust/lance/src/dataset/updater.rs | 1 + rust/lance/src/dataset/write.rs | 21 +++++++++---- rust/lance/src/dataset/write/merge_insert.rs | 1 + 8 files changed, 65 insertions(+), 13 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index cac86b1df9..fdd1b8588c 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3667,6 +3667,7 @@ def write_dataset( use_legacy_format: Optional[bool] = None, enable_v2_manifest_paths: bool = False, enable_move_stable_row_ids: bool = False, + file_writer_options: Optional[Dict[str, str]] = None, ) -> LanceDataset: """Write a given data_obj to the given uri @@ -3725,6 +3726,9 @@ def write_dataset( These row ids are stable after compaction operations, but not after updates. This makes compaction more efficient, since with stable row ids no secondary indices need to be updated to point to new row ids. + file_writer_options : optional, dict + Extra file write options that make sense to control lance data file. This is + used to set page size for file in disk. """ if use_legacy_format is not None: warnings.warn( @@ -3759,6 +3763,7 @@ def write_dataset( "data_storage_version": data_storage_version, "enable_v2_manifest_paths": enable_v2_manifest_paths, "enable_move_stable_row_ids": enable_move_stable_row_ids, + "file_writer_options": file_writer_options, } if commit_lock: diff --git a/python/python/lance/fragment.py b/python/python/lance/fragment.py index dd17fbcf42..88b4416113 100644 --- a/python/python/lance/fragment.py +++ b/python/python/lance/fragment.py @@ -265,6 +265,7 @@ def create( data_storage_version: Optional[str] = None, use_legacy_format: Optional[bool] = None, storage_options: Optional[Dict[str, str]] = None, + file_writer_options: Optional[Dict[str, str]] = None, ) -> FragmentMetadata: """Create a :class:`FragmentMetadata` from the given data. @@ -304,7 +305,9 @@ def create( storage_options : optional, dict Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc. - + file_writer_options : optional, dict + Extra file write options that make sense to control lance data file. This is + used to set page size for file in disk. See Also -------- lance.dataset.LanceOperation.Overwrite : @@ -345,6 +348,7 @@ def create( mode=mode, data_storage_version=data_storage_version, storage_options=storage_options, + file_writer_options=file_writer_options ) @property diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 5779cf8172..b1f23d1c83 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -76,7 +76,7 @@ use pyo3::{ }; use pyo3::{prelude::*, IntoPyObjectExt}; use snafu::location; - +use lance_file::v2::writer::FileWriterOptions; use crate::error::PythonErrorExt; use crate::file::object_store_from_uri_or_path; use crate::fragment::FileFragment; @@ -1725,6 +1725,34 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult>(options, "file_writer_options")? + { + let mut file_writer_options = FileWriterOptions::default(); + if let Some(max_page_bytes) = writer_options.get("max_page_bytes") + { + file_writer_options.max_page_bytes = match max_page_bytes.parse::() { + Ok(n) => Some(n), + Err(_e) => None + } + } + if let Some(data_cache_bytes) = writer_options.get("data_cache_bytes") + { + file_writer_options.data_cache_bytes = match data_cache_bytes.parse::() { + Ok(n) => Some(n), + Err(_e) => None + } + } + if let Some(keep_original_array) = writer_options.get("keep_original_array") + { + file_writer_options.keep_original_array = match keep_original_array.parse::() { + Ok(n) => Some(n), + Err(_e) => None + } + } + p.file_writer_options = Some(file_writer_options); + } + if let Some(enable_move_stable_row_ids) = get_dict_opt::(options, "enable_move_stable_row_ids")? { diff --git a/rust/lance-encoding/src/encodings/physical/value.rs b/rust/lance-encoding/src/encodings/physical/value.rs index c1543daa4f..4d80e6b967 100644 --- a/rust/lance-encoding/src/encodings/physical/value.rs +++ b/rust/lance-encoding/src/encodings/physical/value.rs @@ -140,8 +140,9 @@ impl ValuePageDecoder { let bytes_u8: Vec = self.data[0].to_vec(); let buffer_compressor = GeneralBufferCompressor::get_compressor(self.compression_config); let mut uncompressed_bytes: Vec = Vec::new(); + println!("before decompress {}", bytes_u8.len()); buffer_compressor.decompress(&bytes_u8, &mut uncompressed_bytes)?; - + println!("after decompress {}", uncompressed_bytes.len()); let mut bytes_in_ranges: Vec = Vec::with_capacity(self.uncompressed_range_offsets.len()); for range in &self.uncompressed_range_offsets { diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index 213e4a8ee3..81d7f89c7c 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -99,11 +99,12 @@ impl<'a> FragmentCreateBuilder<'a> { let mut fragment = Fragment::new(id); let full_path = base_path.child(DATA_DIR).child(filename.clone()); let obj_writer = object_store.create(&full_path).await?; - let mut writer = lance_file::v2::writer::FileWriter::try_new( - obj_writer, - schema, - FileWriterOptions::default(), - )?; + let file_write_options = params + .file_writer_options + .clone() + .unwrap_or(FileWriterOptions::default()); + let mut writer = + lance_file::v2::writer::FileWriter::try_new(obj_writer, schema, file_write_options)?; let (major, minor) = writer.version().to_numbers(); diff --git a/rust/lance/src/dataset/updater.rs b/rust/lance/src/dataset/updater.rs index 750cfb6eec..eb04427792 100644 --- a/rust/lance/src/dataset/updater.rs +++ b/rust/lance/src/dataset/updater.rs @@ -148,6 +148,7 @@ impl Updater { &schema, &self.fragment.dataset().base, data_storage_version, + None ) .await } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 0347300f9e..22b7e31312 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -172,6 +172,8 @@ pub struct WriteParams { pub object_store_registry: Arc, pub session: Option>, + + pub file_writer_options: Option, } impl Default for WriteParams { @@ -191,6 +193,7 @@ impl Default for WriteParams { enable_v2_manifest_paths: false, object_store_registry: Arc::new(ObjectStoreRegistry::default()), session: None, + file_writer_options: None, } } } @@ -249,7 +252,7 @@ pub async fn do_write_fragments( .boxed() }; - let writer_generator = WriterGenerator::new(object_store, base_dir, schema, storage_version); + let writer_generator = WriterGenerator::new(object_store, base_dir, schema, storage_version, params.clone()); let mut writer: Option> = None; let mut num_rows_in_current_file = 0; let mut fragments = Vec::new(); @@ -508,6 +511,7 @@ pub async fn open_writer( schema: &Schema, base_dir: &Path, storage_version: LanceFileVersion, + params: Option<&WriteParams>, ) -> Result> { let filename = format!("{}.lance", Uuid::new_v4()); @@ -525,14 +529,17 @@ pub async fn open_writer( filename, )) } else { + let mut file_write_options = params + .as_ref() + .and_then(|wp| wp.file_writer_options.as_ref()) + .cloned() + .unwrap_or_default(); + file_write_options.format_version = Some(storage_version); let writer = object_store.create(&full_path).await?; let file_writer = v2::writer::FileWriter::try_new( writer, schema.clone(), - FileWriterOptions { - format_version: Some(storage_version), - ..Default::default() - }, + file_write_options, )?; let writer_adapter = V2WriterAdapter { writer: file_writer, @@ -549,6 +556,7 @@ struct WriterGenerator { base_dir: Path, schema: Schema, storage_version: LanceFileVersion, + params: WriteParams, } impl WriterGenerator { @@ -557,12 +565,14 @@ impl WriterGenerator { base_dir: &Path, schema: &Schema, storage_version: LanceFileVersion, + params: WriteParams, ) -> Self { Self { object_store, base_dir: base_dir.clone(), schema: schema.clone(), storage_version, + params, } } @@ -575,6 +585,7 @@ impl WriterGenerator { &self.schema, &self.base_dir, self.storage_version, + Some(&self.params) ) .await?; diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 92c847b64b..d04351455d 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -721,6 +721,7 @@ impl MergeInsertJob { &write_schema, &dataset.base, data_storage_version, + None, ) .await?; From 37d9992cb5ce2894c9f5834dd81b0d3c05192111 Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Mon, 10 Mar 2025 17:54:01 +0800 Subject: [PATCH 2/7] fix code --- python/python/lance/fragment.py | 2 +- .../src/encodings/physical/value.rs | 3 +-- rust/lance/src/dataset/updater.rs | 2 +- rust/lance/src/dataset/write.rs | 17 ++++++++++------- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/python/python/lance/fragment.py b/python/python/lance/fragment.py index 88b4416113..766f0f3588 100644 --- a/python/python/lance/fragment.py +++ b/python/python/lance/fragment.py @@ -348,7 +348,7 @@ def create( mode=mode, data_storage_version=data_storage_version, storage_options=storage_options, - file_writer_options=file_writer_options + file_writer_options=file_writer_options, ) @property diff --git a/rust/lance-encoding/src/encodings/physical/value.rs b/rust/lance-encoding/src/encodings/physical/value.rs index 4d80e6b967..c1543daa4f 100644 --- a/rust/lance-encoding/src/encodings/physical/value.rs +++ b/rust/lance-encoding/src/encodings/physical/value.rs @@ -140,9 +140,8 @@ impl ValuePageDecoder { let bytes_u8: Vec = self.data[0].to_vec(); let buffer_compressor = GeneralBufferCompressor::get_compressor(self.compression_config); let mut uncompressed_bytes: Vec = Vec::new(); - println!("before decompress {}", bytes_u8.len()); buffer_compressor.decompress(&bytes_u8, &mut uncompressed_bytes)?; - println!("after decompress {}", uncompressed_bytes.len()); + let mut bytes_in_ranges: Vec = Vec::with_capacity(self.uncompressed_range_offsets.len()); for range in &self.uncompressed_range_offsets { diff --git a/rust/lance/src/dataset/updater.rs b/rust/lance/src/dataset/updater.rs index eb04427792..e62a3b4e41 100644 --- a/rust/lance/src/dataset/updater.rs +++ b/rust/lance/src/dataset/updater.rs @@ -148,7 +148,7 @@ impl Updater { &schema, &self.fragment.dataset().base, data_storage_version, - None + None, ) .await } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 22b7e31312..265c3d2a5f 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -252,7 +252,13 @@ pub async fn do_write_fragments( .boxed() }; - let writer_generator = WriterGenerator::new(object_store, base_dir, schema, storage_version, params.clone()); + let writer_generator = WriterGenerator::new( + object_store, + base_dir, + schema, + storage_version, + params.clone(), + ); let mut writer: Option> = None; let mut num_rows_in_current_file = 0; let mut fragments = Vec::new(); @@ -536,11 +542,8 @@ pub async fn open_writer( .unwrap_or_default(); file_write_options.format_version = Some(storage_version); let writer = object_store.create(&full_path).await?; - let file_writer = v2::writer::FileWriter::try_new( - writer, - schema.clone(), - file_write_options, - )?; + let file_writer = + v2::writer::FileWriter::try_new(writer, schema.clone(), file_write_options)?; let writer_adapter = V2WriterAdapter { writer: file_writer, path: filename, @@ -585,7 +588,7 @@ impl WriterGenerator { &self.schema, &self.base_dir, self.storage_version, - Some(&self.params) + Some(&self.params), ) .await?; From 94e4ccef5b46f813133cfa327705149572eb5ac4 Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Mon, 10 Mar 2025 19:08:55 +0800 Subject: [PATCH 3/7] fix ci --- rust/lance/src/dataset/fragment/write.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index 81d7f89c7c..9781cb52e7 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -102,7 +102,7 @@ impl<'a> FragmentCreateBuilder<'a> { let file_write_options = params .file_writer_options .clone() - .unwrap_or(FileWriterOptions::default()); + .unwrap_or_default(); let mut writer = lance_file::v2::writer::FileWriter::try_new(obj_writer, schema, file_write_options)?; From 41d79c144654f4efc84783195fb800473230d708 Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Mon, 10 Mar 2025 20:29:02 +0800 Subject: [PATCH 4/7] fix ci --- rust/lance/src/dataset/fragment/write.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index 9781cb52e7..4bdc1364c4 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -8,7 +8,6 @@ use lance_core::datatypes::Schema; use lance_core::Error; use lance_datafusion::chunker::{break_stream, chunk_stream}; use lance_datafusion::utils::StreamingWriteSource; -use lance_file::v2::writer::FileWriterOptions; use lance_file::version::LanceFileVersion; use lance_file::writer::FileWriter; use lance_io::object_store::ObjectStore; @@ -99,10 +98,7 @@ impl<'a> FragmentCreateBuilder<'a> { let mut fragment = Fragment::new(id); let full_path = base_path.child(DATA_DIR).child(filename.clone()); let obj_writer = object_store.create(&full_path).await?; - let file_write_options = params - .file_writer_options - .clone() - .unwrap_or_default(); + let file_write_options = params.file_writer_options.clone().unwrap_or_default(); let mut writer = lance_file::v2::writer::FileWriter::try_new(obj_writer, schema, file_write_options)?; From 1916a4f65aae4a32ace4b61be3cd4464c7904fe3 Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Tue, 11 Mar 2025 15:47:14 +0800 Subject: [PATCH 5/7] fix ci --- python/src/dataset.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index b1f23d1c83..af168e5343 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1729,25 +1729,22 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult>(options, "file_writer_options")? { let mut file_writer_options = FileWriterOptions::default(); - if let Some(max_page_bytes) = writer_options.get("max_page_bytes") - { + if let Some(max_page_bytes) = writer_options.get("max_page_bytes") { file_writer_options.max_page_bytes = match max_page_bytes.parse::() { Ok(n) => Some(n), - Err(_e) => None + Err(_e) => None, } } - if let Some(data_cache_bytes) = writer_options.get("data_cache_bytes") - { + if let Some(data_cache_bytes) = writer_options.get("data_cache_bytes") { file_writer_options.data_cache_bytes = match data_cache_bytes.parse::() { Ok(n) => Some(n), - Err(_e) => None + Err(_e) => None, } } - if let Some(keep_original_array) = writer_options.get("keep_original_array") - { + if let Some(keep_original_array) = writer_options.get("keep_original_array") { file_writer_options.keep_original_array = match keep_original_array.parse::() { Ok(n) => Some(n), - Err(_e) => None + Err(_e) => None, } } p.file_writer_options = Some(file_writer_options); From 878c4f74387c526ded3c2b62e24943f1e56e825b Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Tue, 11 Mar 2025 15:54:46 +0800 Subject: [PATCH 6/7] fix ci --- python/src/dataset.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index af168e5343..9727ace9e2 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1742,7 +1742,8 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult() { + file_writer_options.keep_original_array = match keep_original_array.parse::() + { Ok(n) => Some(n), Err(_e) => None, } From 3debaccbc7709d60fdf114e0e992c2f956a04cab Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Wed, 12 Mar 2025 13:56:43 +0800 Subject: [PATCH 7/7] fix ci --- python/src/dataset.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 9727ace9e2..2e6bfb346e 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -27,6 +27,14 @@ use async_trait::async_trait; use blob::LanceBlobFile; use chrono::Duration; +use crate::error::PythonErrorExt; +use crate::file::object_store_from_uri_or_path; +use crate::fragment::FileFragment; +use crate::schema::LanceSchema; +use crate::session::Session; +use crate::utils::PyLance; +use crate::RT; +use crate::{LanceReader, Scanner}; use arrow_array::Array; use futures::{StreamExt, TryFutureExt}; use lance::dataset::builder::DatasetBuilder; @@ -50,6 +58,7 @@ use lance::dataset::{ColumnAlteration, ProjectionRequest}; use lance::index::vector::utils::get_vector_type; use lance::index::{vector::VectorIndexParams, DatasetIndexInternalExt}; use lance_arrow::as_fixed_size_list_array; +use lance_file::v2::writer::FileWriterOptions; use lance_index::scalar::InvertedIndexParams; use lance_index::{ optimize::OptimizeOptions, @@ -76,15 +85,6 @@ use pyo3::{ }; use pyo3::{prelude::*, IntoPyObjectExt}; use snafu::location; -use lance_file::v2::writer::FileWriterOptions; -use crate::error::PythonErrorExt; -use crate::file::object_store_from_uri_or_path; -use crate::fragment::FileFragment; -use crate::schema::LanceSchema; -use crate::session::Session; -use crate::utils::PyLance; -use crate::RT; -use crate::{LanceReader, Scanner}; use self::cleanup::CleanupStats; use self::commit::PyCommitLock;