Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add file writer option for python sdk #3523

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3667,6 +3667,7 @@ def write_dataset(
use_legacy_format: Optional[bool] = None,
enable_v2_manifest_paths: bool = False,
enable_move_stable_row_ids: bool = False,
file_writer_options: Optional[Dict[str, str]] = None,
) -> LanceDataset:
"""Write a given data_obj to the given uri

Expand Down Expand Up @@ -3725,6 +3726,9 @@ def write_dataset(
These row ids are stable after compaction operations, but not after updates.
This makes compaction more efficient, since with stable row ids no
secondary indices need to be updated to point to new row ids.
file_writer_options : optional, dict
Extra file write options that make sense to control lance data file. This is
used to set page size for file in disk.
Comment on lines +3761 to +3762
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should probably document somewhere what the keys are.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine. I will add docs and tests after resolved the ci issues

"""
if use_legacy_format is not None:
warnings.warn(
Expand Down Expand Up @@ -3759,6 +3763,7 @@ def write_dataset(
"data_storage_version": data_storage_version,
"enable_v2_manifest_paths": enable_v2_manifest_paths,
"enable_move_stable_row_ids": enable_move_stable_row_ids,
"file_writer_options": file_writer_options,
}

if commit_lock:
Expand Down
6 changes: 5 additions & 1 deletion python/python/lance/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ def create(
data_storage_version: Optional[str] = None,
use_legacy_format: Optional[bool] = None,
storage_options: Optional[Dict[str, str]] = None,
file_writer_options: Optional[Dict[str, str]] = None,
) -> FragmentMetadata:
"""Create a :class:`FragmentMetadata` from the given data.

Expand Down Expand Up @@ -304,7 +305,9 @@ def create(
storage_options : optional, dict
Extra options that make sense for a particular storage connection. This is
used to store connection parameters like credentials, endpoint, etc.

file_writer_options : optional, dict
Extra file write options that make sense to control lance data file. This is
used to set page size for file in disk.
See Also
--------
lance.dataset.LanceOperation.Overwrite :
Expand Down Expand Up @@ -345,6 +348,7 @@ def create(
mode=mode,
data_storage_version=data_storage_version,
storage_options=storage_options,
file_writer_options=file_writer_options,
)

@property
Expand Down
44 changes: 35 additions & 9 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ use async_trait::async_trait;
use blob::LanceBlobFile;
use chrono::Duration;

use crate::error::PythonErrorExt;
use crate::file::object_store_from_uri_or_path;
use crate::fragment::FileFragment;
use crate::schema::LanceSchema;
use crate::session::Session;
use crate::utils::PyLance;
use crate::RT;
use crate::{LanceReader, Scanner};
use arrow_array::Array;
use futures::{StreamExt, TryFutureExt};
use lance::dataset::builder::DatasetBuilder;
Expand All @@ -50,6 +58,7 @@ use lance::dataset::{ColumnAlteration, ProjectionRequest};
use lance::index::vector::utils::get_vector_type;
use lance::index::{vector::VectorIndexParams, DatasetIndexInternalExt};
use lance_arrow::as_fixed_size_list_array;
use lance_file::v2::writer::FileWriterOptions;
use lance_index::scalar::InvertedIndexParams;
use lance_index::{
optimize::OptimizeOptions,
Expand Down Expand Up @@ -77,15 +86,6 @@ use pyo3::{
use pyo3::{prelude::*, IntoPyObjectExt};
use snafu::location;

use crate::error::PythonErrorExt;
use crate::file::object_store_from_uri_or_path;
use crate::fragment::FileFragment;
use crate::schema::LanceSchema;
use crate::session::Session;
use crate::utils::PyLance;
use crate::RT;
use crate::{LanceReader, Scanner};

use self::cleanup::CleanupStats;
use self::commit::PyCommitLock;

Expand Down Expand Up @@ -1725,6 +1725,32 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult<Option<WritePar
});
}

if let Some(writer_options) =
get_dict_opt::<HashMap<String, String>>(options, "file_writer_options")?
{
let mut file_writer_options = FileWriterOptions::default();
if let Some(max_page_bytes) = writer_options.get("max_page_bytes") {
file_writer_options.max_page_bytes = match max_page_bytes.parse::<u64>() {
Ok(n) => Some(n),
Err(_e) => None,
}
}
if let Some(data_cache_bytes) = writer_options.get("data_cache_bytes") {
file_writer_options.data_cache_bytes = match data_cache_bytes.parse::<u64>() {
Ok(n) => Some(n),
Err(_e) => None,
}
}
if let Some(keep_original_array) = writer_options.get("keep_original_array") {
file_writer_options.keep_original_array = match keep_original_array.parse::<bool>()
{
Ok(n) => Some(n),
Err(_e) => None,
}
}
p.file_writer_options = Some(file_writer_options);
}

if let Some(enable_move_stable_row_ids) =
get_dict_opt::<bool>(options, "enable_move_stable_row_ids")?
{
Expand Down
9 changes: 3 additions & 6 deletions rust/lance/src/dataset/fragment/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use lance_core::datatypes::Schema;
use lance_core::Error;
use lance_datafusion::chunker::{break_stream, chunk_stream};
use lance_datafusion::utils::StreamingWriteSource;
use lance_file::v2::writer::FileWriterOptions;
use lance_file::version::LanceFileVersion;
use lance_file::writer::FileWriter;
use lance_io::object_store::ObjectStore;
Expand Down Expand Up @@ -99,11 +98,9 @@ impl<'a> FragmentCreateBuilder<'a> {
let mut fragment = Fragment::new(id);
let full_path = base_path.child(DATA_DIR).child(filename.clone());
let obj_writer = object_store.create(&full_path).await?;
let mut writer = lance_file::v2::writer::FileWriter::try_new(
obj_writer,
schema,
FileWriterOptions::default(),
)?;
let file_write_options = params.file_writer_options.clone().unwrap_or_default();
let mut writer =
lance_file::v2::writer::FileWriter::try_new(obj_writer, schema, file_write_options)?;

let (major, minor) = writer.version().to_numbers();

Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl Updater {
&schema,
&self.fragment.dataset().base,
data_storage_version,
None,
)
.await
}
Expand Down
32 changes: 23 additions & 9 deletions rust/lance/src/dataset/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ pub struct WriteParams {
pub object_store_registry: Arc<ObjectStoreRegistry>,

pub session: Option<Arc<Session>>,

pub file_writer_options: Option<FileWriterOptions>,
}

impl Default for WriteParams {
Expand All @@ -191,6 +193,7 @@ impl Default for WriteParams {
enable_v2_manifest_paths: false,
object_store_registry: Arc::new(ObjectStoreRegistry::default()),
session: None,
file_writer_options: None,
}
}
}
Expand Down Expand Up @@ -249,7 +252,13 @@ pub async fn do_write_fragments(
.boxed()
};

let writer_generator = WriterGenerator::new(object_store, base_dir, schema, storage_version);
let writer_generator = WriterGenerator::new(
object_store,
base_dir,
schema,
storage_version,
params.clone(),
);
let mut writer: Option<Box<dyn GenericWriter>> = None;
let mut num_rows_in_current_file = 0;
let mut fragments = Vec::new();
Expand Down Expand Up @@ -508,6 +517,7 @@ pub async fn open_writer(
schema: &Schema,
base_dir: &Path,
storage_version: LanceFileVersion,
params: Option<&WriteParams>,
) -> Result<Box<dyn GenericWriter>> {
let filename = format!("{}.lance", Uuid::new_v4());

Expand All @@ -525,15 +535,15 @@ pub async fn open_writer(
filename,
))
} else {
let mut file_write_options = params
.as_ref()
.and_then(|wp| wp.file_writer_options.as_ref())
.cloned()
.unwrap_or_default();
file_write_options.format_version = Some(storage_version);
let writer = object_store.create(&full_path).await?;
let file_writer = v2::writer::FileWriter::try_new(
writer,
schema.clone(),
FileWriterOptions {
format_version: Some(storage_version),
..Default::default()
},
)?;
let file_writer =
v2::writer::FileWriter::try_new(writer, schema.clone(), file_write_options)?;
let writer_adapter = V2WriterAdapter {
writer: file_writer,
path: filename,
Expand All @@ -549,6 +559,7 @@ struct WriterGenerator {
base_dir: Path,
schema: Schema,
storage_version: LanceFileVersion,
params: WriteParams,
}

impl WriterGenerator {
Expand All @@ -557,12 +568,14 @@ impl WriterGenerator {
base_dir: &Path,
schema: &Schema,
storage_version: LanceFileVersion,
params: WriteParams,
) -> Self {
Self {
object_store,
base_dir: base_dir.clone(),
schema: schema.clone(),
storage_version,
params,
}
}

Expand All @@ -575,6 +588,7 @@ impl WriterGenerator {
&self.schema,
&self.base_dir,
self.storage_version,
Some(&self.params),
)
.await?;

Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/write/merge_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ impl MergeInsertJob {
&write_schema,
&dataset.base,
data_storage_version,
None,
)
.await?;

Expand Down
Loading