diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 57dacdb580..905fd2111d 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3698,6 +3698,7 @@ def write_dataset( use_legacy_format: Optional[bool] = None, enable_v2_manifest_paths: bool = False, enable_move_stable_row_ids: bool = False, + file_writer_options: Optional[Dict[str, str]] = None, ) -> LanceDataset: """Write a given data_obj to the given uri @@ -3756,6 +3757,9 @@ def write_dataset( These row ids are stable after compaction operations, but not after updates. This makes compaction more efficient, since with stable row ids no secondary indices need to be updated to point to new row ids. + file_writer_options : optional, dict + Extra file write options that make sense to control lance data file. This is + used to set page size for file in disk. """ if use_legacy_format is not None: warnings.warn( @@ -3790,6 +3794,7 @@ def write_dataset( "data_storage_version": data_storage_version, "enable_v2_manifest_paths": enable_v2_manifest_paths, "enable_move_stable_row_ids": enable_move_stable_row_ids, + "file_writer_options": file_writer_options, } if commit_lock: diff --git a/python/python/lance/fragment.py b/python/python/lance/fragment.py index dd17fbcf42..766f0f3588 100644 --- a/python/python/lance/fragment.py +++ b/python/python/lance/fragment.py @@ -265,6 +265,7 @@ def create( data_storage_version: Optional[str] = None, use_legacy_format: Optional[bool] = None, storage_options: Optional[Dict[str, str]] = None, + file_writer_options: Optional[Dict[str, str]] = None, ) -> FragmentMetadata: """Create a :class:`FragmentMetadata` from the given data. @@ -304,7 +305,9 @@ def create( storage_options : optional, dict Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc. - + file_writer_options : optional, dict + Extra file write options that make sense to control lance data file. This is + used to set page size for file in disk. See Also -------- lance.dataset.LanceOperation.Overwrite : @@ -345,6 +348,7 @@ def create( mode=mode, data_storage_version=data_storage_version, storage_options=storage_options, + file_writer_options=file_writer_options, ) @property diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 39a00d7fb1..c2d8ee4035 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -27,6 +27,14 @@ use async_trait::async_trait; use blob::LanceBlobFile; use chrono::Duration; +use crate::error::PythonErrorExt; +use crate::file::object_store_from_uri_or_path; +use crate::fragment::FileFragment; +use crate::schema::LanceSchema; +use crate::session::Session; +use crate::utils::PyLance; +use crate::RT; +use crate::{LanceReader, Scanner}; use arrow_array::Array; use futures::{StreamExt, TryFutureExt}; use lance::dataset::builder::DatasetBuilder; @@ -50,6 +58,7 @@ use lance::dataset::{ColumnAlteration, ProjectionRequest}; use lance::index::vector::utils::get_vector_type; use lance::index::{vector::VectorIndexParams, DatasetIndexInternalExt}; use lance_arrow::as_fixed_size_list_array; +use lance_file::v2::writer::FileWriterOptions; use lance_index::scalar::InvertedIndexParams; use lance_index::{ optimize::OptimizeOptions, @@ -77,15 +86,6 @@ use pyo3::{ use pyo3::{prelude::*, IntoPyObjectExt}; use snafu::location; -use crate::error::PythonErrorExt; -use crate::file::object_store_from_uri_or_path; -use crate::fragment::FileFragment; -use crate::schema::LanceSchema; -use crate::session::Session; -use crate::utils::PyLance; -use crate::RT; -use crate::{LanceReader, Scanner}; - use self::cleanup::CleanupStats; use self::commit::PyCommitLock; @@ -1730,6 +1730,32 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult>(options, "file_writer_options")? + { + let mut file_writer_options = FileWriterOptions::default(); + if let Some(max_page_bytes) = writer_options.get("max_page_bytes") { + file_writer_options.max_page_bytes = match max_page_bytes.parse::() { + Ok(n) => Some(n), + Err(_e) => None, + } + } + if let Some(data_cache_bytes) = writer_options.get("data_cache_bytes") { + file_writer_options.data_cache_bytes = match data_cache_bytes.parse::() { + Ok(n) => Some(n), + Err(_e) => None, + } + } + if let Some(keep_original_array) = writer_options.get("keep_original_array") { + file_writer_options.keep_original_array = match keep_original_array.parse::() + { + Ok(n) => Some(n), + Err(_e) => None, + } + } + p.file_writer_options = Some(file_writer_options); + } + if let Some(enable_move_stable_row_ids) = get_dict_opt::(options, "enable_move_stable_row_ids")? { diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index 213e4a8ee3..4bdc1364c4 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -8,7 +8,6 @@ use lance_core::datatypes::Schema; use lance_core::Error; use lance_datafusion::chunker::{break_stream, chunk_stream}; use lance_datafusion::utils::StreamingWriteSource; -use lance_file::v2::writer::FileWriterOptions; use lance_file::version::LanceFileVersion; use lance_file::writer::FileWriter; use lance_io::object_store::ObjectStore; @@ -99,11 +98,9 @@ impl<'a> FragmentCreateBuilder<'a> { let mut fragment = Fragment::new(id); let full_path = base_path.child(DATA_DIR).child(filename.clone()); let obj_writer = object_store.create(&full_path).await?; - let mut writer = lance_file::v2::writer::FileWriter::try_new( - obj_writer, - schema, - FileWriterOptions::default(), - )?; + let file_write_options = params.file_writer_options.clone().unwrap_or_default(); + let mut writer = + lance_file::v2::writer::FileWriter::try_new(obj_writer, schema, file_write_options)?; let (major, minor) = writer.version().to_numbers(); diff --git a/rust/lance/src/dataset/updater.rs b/rust/lance/src/dataset/updater.rs index 750cfb6eec..e62a3b4e41 100644 --- a/rust/lance/src/dataset/updater.rs +++ b/rust/lance/src/dataset/updater.rs @@ -148,6 +148,7 @@ impl Updater { &schema, &self.fragment.dataset().base, data_storage_version, + None, ) .await } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 0347300f9e..265c3d2a5f 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -172,6 +172,8 @@ pub struct WriteParams { pub object_store_registry: Arc, pub session: Option>, + + pub file_writer_options: Option, } impl Default for WriteParams { @@ -191,6 +193,7 @@ impl Default for WriteParams { enable_v2_manifest_paths: false, object_store_registry: Arc::new(ObjectStoreRegistry::default()), session: None, + file_writer_options: None, } } } @@ -249,7 +252,13 @@ pub async fn do_write_fragments( .boxed() }; - let writer_generator = WriterGenerator::new(object_store, base_dir, schema, storage_version); + let writer_generator = WriterGenerator::new( + object_store, + base_dir, + schema, + storage_version, + params.clone(), + ); let mut writer: Option> = None; let mut num_rows_in_current_file = 0; let mut fragments = Vec::new(); @@ -508,6 +517,7 @@ pub async fn open_writer( schema: &Schema, base_dir: &Path, storage_version: LanceFileVersion, + params: Option<&WriteParams>, ) -> Result> { let filename = format!("{}.lance", Uuid::new_v4()); @@ -525,15 +535,15 @@ pub async fn open_writer( filename, )) } else { + let mut file_write_options = params + .as_ref() + .and_then(|wp| wp.file_writer_options.as_ref()) + .cloned() + .unwrap_or_default(); + file_write_options.format_version = Some(storage_version); let writer = object_store.create(&full_path).await?; - let file_writer = v2::writer::FileWriter::try_new( - writer, - schema.clone(), - FileWriterOptions { - format_version: Some(storage_version), - ..Default::default() - }, - )?; + let file_writer = + v2::writer::FileWriter::try_new(writer, schema.clone(), file_write_options)?; let writer_adapter = V2WriterAdapter { writer: file_writer, path: filename, @@ -549,6 +559,7 @@ struct WriterGenerator { base_dir: Path, schema: Schema, storage_version: LanceFileVersion, + params: WriteParams, } impl WriterGenerator { @@ -557,12 +568,14 @@ impl WriterGenerator { base_dir: &Path, schema: &Schema, storage_version: LanceFileVersion, + params: WriteParams, ) -> Self { Self { object_store, base_dir: base_dir.clone(), schema: schema.clone(), storage_version, + params, } } @@ -575,6 +588,7 @@ impl WriterGenerator { &self.schema, &self.base_dir, self.storage_version, + Some(&self.params), ) .await?; diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 92c847b64b..d04351455d 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -721,6 +721,7 @@ impl MergeInsertJob { &write_schema, &dataset.base, data_storage_version, + None, ) .await?;