Skip to content

Commit 6194619

Browse files
authored
feat: add support for pickling fragment metadata (#3497)
1 parent 89a33b7 commit 6194619

File tree

3 files changed

+161
-1
lines changed

3 files changed

+161
-1
lines changed

python/python/lance/lance/fragment.pyi

+61-1
Original file line numberDiff line numberDiff line change
@@ -56,5 +56,65 @@ class DeletionFile:
5656
"""
5757
...
5858

59+
def json(self) -> str:
60+
"""Get a JSON representation of the deletion file.
61+
62+
Returns
63+
-------
64+
str
65+
66+
Warning
67+
-------
68+
The JSON representation is not guaranteed to be stable across versions.
69+
"""
70+
...
71+
72+
@classmethod
73+
def from_json(json: str) -> DeletionFile:
74+
"""
75+
Load a deletion file from a JSON representation.
76+
77+
Parameters
78+
----------
79+
json : str
80+
The JSON representation of the deletion file.
81+
82+
Returns
83+
-------
84+
DeletionFile
85+
"""
86+
...
87+
88+
def __reduce__(self) -> tuple: ...
89+
5990
class RowIdMeta:
60-
pass
91+
def json(self) -> str:
92+
"""Get a JSON representation of the row id metadata.
93+
94+
Returns
95+
-------
96+
str
97+
98+
Warning
99+
-------
100+
The JSON representation is not guaranteed to be stable across versions.
101+
"""
102+
...
103+
104+
@classmethod
105+
def from_json(json: str) -> RowIdMeta:
106+
"""
107+
Load row id metadata from a JSON representation.
108+
109+
Parameters
110+
----------
111+
json : str
112+
The JSON representation of the row id metadata.
113+
114+
Returns
115+
-------
116+
RowIdMeta
117+
"""
118+
...
119+
120+
def __reduce__(self) -> tuple: ...

python/python/tests/test_fragment.py

+24
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import json
55
import multiprocessing
6+
import pickle
67
import uuid
78
from pathlib import Path
89

@@ -435,3 +436,26 @@ def test_fragment_count_rows(tmp_path: Path):
435436
assert fragments[0].count_rows() == 800
436437
assert fragments[0].count_rows("a < 200") == 200
437438
assert fragments[0].count_rows(pc.field("a") < 200) == 200
439+
440+
441+
@pytest.mark.parametrize("enable_move_stable_row_ids", [False, True])
442+
def test_fragment_metadata_pickle(tmp_path: Path, enable_move_stable_row_ids: bool):
443+
ds = write_dataset(
444+
pa.table({"a": range(100)}),
445+
tmp_path,
446+
enable_move_stable_row_ids=enable_move_stable_row_ids,
447+
)
448+
# Create a deletion file
449+
ds.delete("a < 50")
450+
fragment = ds.get_fragments()[0]
451+
452+
frag_meta = fragment.metadata
453+
454+
assert frag_meta.deletion_file is not None
455+
if enable_move_stable_row_ids:
456+
assert frag_meta.row_id_meta is not None
457+
458+
# Pickle and unpickle the fragment metadata
459+
round_trip = pickle.loads(pickle.dumps(frag_meta))
460+
461+
assert frag_meta == round_trip

python/src/fragment.rs

+76
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use lance::Error;
2727
use lance_table::format::{DataFile, DeletionFile, DeletionFileType, Fragment, RowIdMeta};
2828
use lance_table::io::deletion::deletion_file_path;
2929
use object_store::path::Path;
30+
use pyo3::basic::CompareOp;
31+
use pyo3::types::PyTuple;
3032
use pyo3::{exceptions::*, types::PyDict};
3133
use pyo3::{intern, prelude::*};
3234
use snafu::location;
@@ -507,6 +509,43 @@ impl PyDeletionFile {
507509
};
508510
Ok(deletion_file_path(&base_path, fragment_id, &self.0).to_string())
509511
}
512+
513+
pub fn json(&self) -> PyResult<String> {
514+
serde_json::to_string(&self.0).map_err(|err| {
515+
PyValueError::new_err(format!(
516+
"Could not dump CompactionPlan due to error: {}",
517+
err
518+
))
519+
})
520+
}
521+
522+
#[staticmethod]
523+
pub fn from_json(json: String) -> PyResult<Self> {
524+
let deletion_file = serde_json::from_str(&json).map_err(|err| {
525+
PyValueError::new_err(format!("Could not load DeletionFile due to error: {}", err))
526+
})?;
527+
Ok(Self(deletion_file))
528+
}
529+
530+
fn __reduce__(&self, py: Python<'_>) -> PyResult<(PyObject, PyObject)> {
531+
let state = self.json()?;
532+
let state = PyTuple::new_bound(py, vec![state]).extract()?;
533+
let from_json = PyModule::import_bound(py, "lance.fragment")?
534+
.getattr("DeletionFile")?
535+
.getattr("from_json")?
536+
.extract()?;
537+
Ok((from_json, state))
538+
}
539+
540+
pub fn __richcmp__(&self, other: PyRef<'_, Self>, op: CompareOp) -> PyResult<bool> {
541+
match op {
542+
CompareOp::Eq => Ok(self.0 == other.0),
543+
CompareOp::Ne => Ok(self.0 != other.0),
544+
_ => Err(PyNotImplementedError::new_err(
545+
"Only == and != are supported for CompactionTask",
546+
)),
547+
}
548+
}
510549
}
511550

512551
#[pyclass(name = "RowIdMeta", module = "lance.fragment")]
@@ -519,6 +558,43 @@ impl PyRowIdMeta {
519558
"PyRowIdMeta.asdict is not yet supported.s",
520559
))
521560
}
561+
562+
pub fn json(&self) -> PyResult<String> {
563+
serde_json::to_string(&self.0).map_err(|err| {
564+
PyValueError::new_err(format!(
565+
"Could not dump CompactionPlan due to error: {}",
566+
err
567+
))
568+
})
569+
}
570+
571+
#[staticmethod]
572+
pub fn from_json(json: String) -> PyResult<Self> {
573+
let row_id_meta = serde_json::from_str(&json).map_err(|err| {
574+
PyValueError::new_err(format!("Could not load RowIdMeta due to error: {}", err))
575+
})?;
576+
Ok(Self(row_id_meta))
577+
}
578+
579+
fn __reduce__(&self, py: Python<'_>) -> PyResult<(PyObject, PyObject)> {
580+
let state = self.json()?;
581+
let state = PyTuple::new_bound(py, vec![state]).extract()?;
582+
let from_json = PyModule::import_bound(py, "lance.fragment")?
583+
.getattr("RowIdMeta")?
584+
.getattr("from_json")?
585+
.extract()?;
586+
Ok((from_json, state))
587+
}
588+
589+
pub fn __richcmp__(&self, other: PyRef<'_, Self>, op: CompareOp) -> PyResult<bool> {
590+
match op {
591+
CompareOp::Eq => Ok(self.0 == other.0),
592+
CompareOp::Ne => Ok(self.0 != other.0),
593+
_ => Err(PyNotImplementedError::new_err(
594+
"Only == and != are supported for CompactionTask",
595+
)),
596+
}
597+
}
522598
}
523599

524600
impl FromPyObject<'_> for PyLance<Fragment> {

0 commit comments

Comments
 (0)