Skip to content

Commit 45436a2

Browse files
committed
feat: add file writer option for python sdk
1 parent 9a1fdaf commit 45436a2

File tree

8 files changed

+65
-13
lines changed

8 files changed

+65
-13
lines changed

python/python/lance/dataset.py

+5
Original file line numberDiff line numberDiff line change
@@ -3667,6 +3667,7 @@ def write_dataset(
36673667
use_legacy_format: Optional[bool] = None,
36683668
enable_v2_manifest_paths: bool = False,
36693669
enable_move_stable_row_ids: bool = False,
3670+
file_writer_options: Optional[Dict[str, str]] = None,
36703671
) -> LanceDataset:
36713672
"""Write a given data_obj to the given uri
36723673
@@ -3725,6 +3726,9 @@ def write_dataset(
37253726
These row ids are stable after compaction operations, but not after updates.
37263727
This makes compaction more efficient, since with stable row ids no
37273728
secondary indices need to be updated to point to new row ids.
3729+
file_writer_options : optional, dict
3730+
Extra file write options that make sense to control lance data file. This is
3731+
used to set page size for file in disk.
37283732
"""
37293733
if use_legacy_format is not None:
37303734
warnings.warn(
@@ -3759,6 +3763,7 @@ def write_dataset(
37593763
"data_storage_version": data_storage_version,
37603764
"enable_v2_manifest_paths": enable_v2_manifest_paths,
37613765
"enable_move_stable_row_ids": enable_move_stable_row_ids,
3766+
"file_writer_options": file_writer_options,
37623767
}
37633768

37643769
if commit_lock:

python/python/lance/fragment.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ def create(
265265
data_storage_version: Optional[str] = None,
266266
use_legacy_format: Optional[bool] = None,
267267
storage_options: Optional[Dict[str, str]] = None,
268+
file_writer_options: Optional[Dict[str, str]] = None,
268269
) -> FragmentMetadata:
269270
"""Create a :class:`FragmentMetadata` from the given data.
270271
@@ -304,7 +305,9 @@ def create(
304305
storage_options : optional, dict
305306
Extra options that make sense for a particular storage connection. This is
306307
used to store connection parameters like credentials, endpoint, etc.
307-
308+
file_writer_options : optional, dict
309+
Extra file write options that make sense to control lance data file. This is
310+
used to set page size for file in disk.
308311
See Also
309312
--------
310313
lance.dataset.LanceOperation.Overwrite :
@@ -345,6 +348,7 @@ def create(
345348
mode=mode,
346349
data_storage_version=data_storage_version,
347350
storage_options=storage_options,
351+
file_writer_options=file_writer_options
348352
)
349353

350354
@property

python/src/dataset.rs

+29-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ use pyo3::{
7575
PyObject, PyResult,
7676
};
7777
use snafu::location;
78-
78+
use lance_file::v2::writer::FileWriterOptions;
7979
use crate::error::PythonErrorExt;
8080
use crate::file::object_store_from_uri_or_path;
8181
use crate::fragment::FileFragment;
@@ -1715,6 +1715,34 @@ pub fn get_write_params(options: &PyDict) -> PyResult<Option<WriteParams>> {
17151715
});
17161716
}
17171717

1718+
if let Some(writer_options) =
1719+
get_dict_opt::<HashMap<String, String>>(options, "file_writer_options")?
1720+
{
1721+
let mut file_writer_options = FileWriterOptions::default();
1722+
if let Some(max_page_bytes) = writer_options.get("max_page_bytes")
1723+
{
1724+
file_writer_options.max_page_bytes = match max_page_bytes.parse::<u64>() {
1725+
Ok(n) => Some(n),
1726+
Err(_e) => None
1727+
}
1728+
}
1729+
if let Some(data_cache_bytes) = writer_options.get("data_cache_bytes")
1730+
{
1731+
file_writer_options.data_cache_bytes = match data_cache_bytes.parse::<u64>() {
1732+
Ok(n) => Some(n),
1733+
Err(_e) => None
1734+
}
1735+
}
1736+
if let Some(keep_original_array) = writer_options.get("keep_original_array")
1737+
{
1738+
file_writer_options.keep_original_array = match keep_original_array.parse::<bool>() {
1739+
Ok(n) => Some(n),
1740+
Err(_e) => None
1741+
}
1742+
}
1743+
p.file_writer_options = Some(file_writer_options);
1744+
}
1745+
17181746
if let Some(enable_move_stable_row_ids) =
17191747
get_dict_opt::<bool>(options, "enable_move_stable_row_ids")?
17201748
{

rust/lance-encoding/src/encodings/physical/value.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,9 @@ impl ValuePageDecoder {
140140
let bytes_u8: Vec<u8> = self.data[0].to_vec();
141141
let buffer_compressor = GeneralBufferCompressor::get_compressor(self.compression_config);
142142
let mut uncompressed_bytes: Vec<u8> = Vec::new();
143+
println!("before decompress {}", bytes_u8.len());
143144
buffer_compressor.decompress(&bytes_u8, &mut uncompressed_bytes)?;
144-
145+
println!("after decompress {}", uncompressed_bytes.len());
145146
let mut bytes_in_ranges: Vec<Bytes> =
146147
Vec::with_capacity(self.uncompressed_range_offsets.len());
147148
for range in &self.uncompressed_range_offsets {

rust/lance/src/dataset/fragment/write.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,12 @@ impl<'a> FragmentCreateBuilder<'a> {
9999
let mut fragment = Fragment::new(id);
100100
let full_path = base_path.child(DATA_DIR).child(filename.clone());
101101
let obj_writer = object_store.create(&full_path).await?;
102-
let mut writer = lance_file::v2::writer::FileWriter::try_new(
103-
obj_writer,
104-
schema,
105-
FileWriterOptions::default(),
106-
)?;
102+
let file_write_options = params
103+
.file_writer_options
104+
.clone()
105+
.unwrap_or(FileWriterOptions::default());
106+
let mut writer =
107+
lance_file::v2::writer::FileWriter::try_new(obj_writer, schema, file_write_options)?;
107108

108109
let (major, minor) = writer.version().to_numbers();
109110

rust/lance/src/dataset/updater.rs

+1
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ impl Updater {
148148
&schema,
149149
&self.fragment.dataset().base,
150150
data_storage_version,
151+
None
151152
)
152153
.await
153154
}

rust/lance/src/dataset/write.rs

+16-5
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ pub struct WriteParams {
172172
pub object_store_registry: Arc<ObjectStoreRegistry>,
173173

174174
pub session: Option<Arc<Session>>,
175+
176+
pub file_writer_options: Option<FileWriterOptions>,
175177
}
176178

177179
impl Default for WriteParams {
@@ -191,6 +193,7 @@ impl Default for WriteParams {
191193
enable_v2_manifest_paths: false,
192194
object_store_registry: Arc::new(ObjectStoreRegistry::default()),
193195
session: None,
196+
file_writer_options: None,
194197
}
195198
}
196199
}
@@ -249,7 +252,7 @@ pub async fn do_write_fragments(
249252
.boxed()
250253
};
251254

252-
let writer_generator = WriterGenerator::new(object_store, base_dir, schema, storage_version);
255+
let writer_generator = WriterGenerator::new(object_store, base_dir, schema, storage_version, params.clone());
253256
let mut writer: Option<Box<dyn GenericWriter>> = None;
254257
let mut num_rows_in_current_file = 0;
255258
let mut fragments = Vec::new();
@@ -506,6 +509,7 @@ pub async fn open_writer(
506509
schema: &Schema,
507510
base_dir: &Path,
508511
storage_version: LanceFileVersion,
512+
params: Option<&WriteParams>,
509513
) -> Result<Box<dyn GenericWriter>> {
510514
let filename = format!("{}.lance", Uuid::new_v4());
511515

@@ -523,14 +527,17 @@ pub async fn open_writer(
523527
filename,
524528
))
525529
} else {
530+
let mut file_write_options = params
531+
.as_ref()
532+
.and_then(|wp| wp.file_writer_options.as_ref())
533+
.cloned()
534+
.unwrap_or_default();
535+
file_write_options.format_version = Some(storage_version);
526536
let writer = object_store.create(&full_path).await?;
527537
let file_writer = v2::writer::FileWriter::try_new(
528538
writer,
529539
schema.clone(),
530-
FileWriterOptions {
531-
format_version: Some(storage_version),
532-
..Default::default()
533-
},
540+
file_write_options,
534541
)?;
535542
let writer_adapter = V2WriterAdapter {
536543
writer: file_writer,
@@ -547,6 +554,7 @@ struct WriterGenerator {
547554
base_dir: Path,
548555
schema: Schema,
549556
storage_version: LanceFileVersion,
557+
params: WriteParams,
550558
}
551559

552560
impl WriterGenerator {
@@ -555,12 +563,14 @@ impl WriterGenerator {
555563
base_dir: &Path,
556564
schema: &Schema,
557565
storage_version: LanceFileVersion,
566+
params: WriteParams,
558567
) -> Self {
559568
Self {
560569
object_store,
561570
base_dir: base_dir.clone(),
562571
schema: schema.clone(),
563572
storage_version,
573+
params,
564574
}
565575
}
566576

@@ -573,6 +583,7 @@ impl WriterGenerator {
573583
&self.schema,
574584
&self.base_dir,
575585
self.storage_version,
586+
Some(&self.params)
576587
)
577588
.await?;
578589

rust/lance/src/dataset/write/merge_insert.rs

+1
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,7 @@ impl MergeInsertJob {
721721
&write_schema,
722722
&dataset.base,
723723
data_storage_version,
724+
None,
724725
)
725726
.await?;
726727

0 commit comments

Comments
 (0)