Skip to content

Commit c901cbe

Browse files
authored
fix: add versioning and bypass broken row counts (lancedb#1534)
Adds a new feature: `WriterVersion` in the manifest. Also fixes two bugs: * lancedb#1531 Adds bypass logic to read correct `physical_rows` when old version is detected. Will update the value on write as needed. * When `migrate_fragments` runs, we do it on the new fragments and not the old, fixing a data loss issue. Fixes lancedb#1531 Fixes lancedb#1535
1 parent 4465738 commit c901cbe

29 files changed

+376
-23
lines changed

.gitignore

+4-3
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ python/lance/_*.cpp
5151

5252
*.parquet
5353
*.parq
54-
*.lance
5554

5655
python/thirdparty/arrow/
5756
python/wheels
@@ -72,7 +71,9 @@ integration/duckdb/lance.duckdb_extension.*.zip
7271

7372
notebooks/lance.duckdb_extension
7473
notebooks/sift
74+
notebooks/image_data/data
7575
benchmarks/sift/sift
76+
benchmarks/sift/sift.lance
7677
benchmarks/sift/lance_ivf*.csv
7778
**/sift.tar.gz
7879

@@ -87,9 +88,9 @@ wheelhouse
8788
# Rust
8889
target
8990
Cargo.lock
90-
data
9191

9292
# c++ lsp
9393
.ccls-cache/
9494

95-
python/venv
95+
python/venv
96+
test_data/venv

protos/format.proto

+16
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,22 @@ message Manifest {
6464
// Schema metadata.
6565
map<string, bytes> metadata = 5;
6666

67+
message WriterVersion {
68+
// The name of the library that created this file.
69+
string library = 1;
70+
// The version of the library that created this file. Because we cannot assume
71+
// that the library is semantically versioned, this is a string. However, if it
72+
// is semantically versioned, it should be a valid semver string without any 'v'
73+
// prefix. For example: `2.0.0`, `2.0.0-rc.1`.
74+
string version = 2;
75+
}
76+
77+
// The version of the writer that created this file.
78+
//
79+
// This information may be used to detect whether the file may have known bugs
80+
// associated with that writer.
81+
WriterVersion writer_version = 13;
82+
6783
// If presented, the file position of the index metadata.
6884
optional uint64 index_section = 6;
6985

rust/lance-core/src/format.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ mod page_table;
2525

2626
pub use fragment::*;
2727
pub use index::Index;
28-
pub use manifest::Manifest;
28+
pub use manifest::{Manifest, WriterVersion};
2929
pub use metadata::{Metadata, StatisticsMetadata};
3030
pub use page_table::{PageInfo, PageTable};
3131

rust/lance-core/src/format/manifest.rs

+69
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ pub struct Manifest {
4040
/// Dataset version
4141
pub version: u64,
4242

43+
/// Version of the writer library that wrote this manifest.
44+
pub writer_version: Option<WriterVersion>,
45+
4346
/// Fragments, the pieces to build the dataset.
4447
pub fragments: Arc<Vec<Fragment>>,
4548

@@ -73,6 +76,7 @@ impl Manifest {
7376
Self {
7477
schema: schema.clone(),
7578
version: 1,
79+
writer_version: Some(WriterVersion::default()),
7680
fragments,
7781
version_aux_data: 0,
7882
index_section: None,
@@ -93,6 +97,7 @@ impl Manifest {
9397
Self {
9498
schema: schema.clone(),
9599
version: previous.version + 1,
100+
writer_version: Some(WriterVersion::default()),
96101
fragments,
97102
version_aux_data: 0,
98103
index_section: None, // Caller should update index if they want to keep them.
@@ -171,6 +176,34 @@ impl Manifest {
171176
}
172177
}
173178

179+
#[derive(Debug, Clone, PartialEq)]
180+
pub struct WriterVersion {
181+
pub library: String,
182+
pub version: String,
183+
}
184+
185+
impl WriterVersion {
186+
/// Try to parse the version string as a semver string. Returns None if
187+
/// not successful.
188+
pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
189+
let mut parts = self.version.split('.');
190+
let major = parts.next().unwrap_or("0").parse().ok()?;
191+
let minor = parts.next().unwrap_or("0").parse().ok()?;
192+
let patch = parts.next().unwrap_or("0").parse().ok()?;
193+
let tag = parts.next();
194+
Some((major, minor, patch, tag))
195+
}
196+
}
197+
198+
impl Default for WriterVersion {
199+
fn default() -> Self {
200+
Self {
201+
library: "lance".to_string(),
202+
version: env!("CARGO_PKG_VERSION").to_string(),
203+
}
204+
}
205+
}
206+
174207
impl ProtoStruct for Manifest {
175208
type Proto = pb::Manifest;
176209
}
@@ -182,9 +215,17 @@ impl From<pb::Manifest> for Manifest {
182215
let nanos = ts.nanos as u128;
183216
sec + nanos
184217
});
218+
// We only use the writer version if it is fully set.
219+
let writer_version = match p.writer_version {
220+
Some(pb::manifest::WriterVersion { library, version }) => {
221+
Some(WriterVersion { library, version })
222+
}
223+
_ => None,
224+
};
185225
Self {
186226
schema: Schema::from((&p.fields, p.metadata)),
187227
version: p.version,
228+
writer_version,
188229
fragments: Arc::new(p.fragments.iter().map(Fragment::from).collect()),
189230
version_aux_data: p.version_aux_data as usize,
190231
index_section: p.index_section.map(|i| i as usize),
@@ -218,6 +259,13 @@ impl From<&Manifest> for pb::Manifest {
218259
Self {
219260
fields,
220261
version: m.version,
262+
writer_version: m
263+
.writer_version
264+
.as_ref()
265+
.map(|wv| pb::manifest::WriterVersion {
266+
library: wv.library.clone(),
267+
version: wv.version.clone(),
268+
}),
221269
fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
222270
metadata,
223271
version_aux_data: m.version_aux_data as u64,
@@ -231,3 +279,24 @@ impl From<&Manifest> for pb::Manifest {
231279
}
232280
}
233281
}
282+
283+
#[cfg(test)]
284+
mod tests {
285+
use super::*;
286+
287+
#[test]
288+
fn test_writer_version() {
289+
let wv = WriterVersion::default();
290+
assert_eq!(wv.library, "lance");
291+
assert_eq!(wv.version, env!("CARGO_PKG_VERSION"));
292+
assert_eq!(
293+
wv.semver(),
294+
Some((
295+
env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
296+
env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
297+
env!("CARGO_PKG_VERSION_PATCH").parse().unwrap(),
298+
None
299+
))
300+
);
301+
}
302+
}

rust/lance/src/dataset.rs

+188-1
Original file line numberDiff line numberDiff line change
@@ -1507,10 +1507,11 @@ mod tests {
15071507
use arrow_schema::{DataType, Field, Fields as ArrowFields, Schema as ArrowSchema};
15081508
use arrow_select::take::take;
15091509
use futures::stream::TryStreamExt;
1510+
use lance_core::format::WriterVersion;
15101511
use lance_index::vector::DIST_COL;
15111512
use lance_linalg::distance::MetricType;
15121513
use lance_testing::datagen::generate_random_array;
1513-
use tempfile::tempdir;
1514+
use tempfile::{tempdir, TempDir};
15141515

15151516
// Used to validate that futures returned are Send.
15161517
fn require_send<T: Send>(t: T) -> T {
@@ -1561,6 +1562,10 @@ mod tests {
15611562

15621563
let actual_ds = Dataset::open(test_uri).await.unwrap();
15631564
assert_eq!(actual_ds.version().version, 1);
1565+
assert_eq!(
1566+
actual_ds.manifest.writer_version,
1567+
Some(WriterVersion::default())
1568+
);
15641569
let actual_schema = ArrowSchema::from(actual_ds.schema());
15651570
assert_eq!(&actual_schema, schema.as_ref());
15661571

@@ -3188,4 +3193,186 @@ mod tests {
31883193
&[1],
31893194
);
31903195
}
3196+
3197+
fn copy_dir_all(
3198+
src: impl AsRef<std::path::Path>,
3199+
dst: impl AsRef<std::path::Path>,
3200+
) -> std::io::Result<()> {
3201+
use std::fs;
3202+
fs::create_dir_all(&dst)?;
3203+
for entry in fs::read_dir(src)? {
3204+
let entry = entry?;
3205+
let ty = entry.file_type()?;
3206+
if ty.is_dir() {
3207+
copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?;
3208+
} else {
3209+
fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
3210+
}
3211+
}
3212+
Ok(())
3213+
}
3214+
3215+
/// Copies a test dataset into a temporary directory, returning the tmpdir.
3216+
///
3217+
/// The `table_path` should be relative to `test_data/` at the root of the
3218+
/// repo.
3219+
fn copy_test_data_to_tmp(table_path: &str) -> std::io::Result<TempDir> {
3220+
use std::path::PathBuf;
3221+
3222+
let mut src = PathBuf::new();
3223+
src.push(env!("CARGO_MANIFEST_DIR"));
3224+
src.push("../../test_data");
3225+
src.push(table_path);
3226+
3227+
let test_dir = tempdir().unwrap();
3228+
3229+
copy_dir_all(src.as_path(), test_dir.path())?;
3230+
3231+
Ok(test_dir)
3232+
}
3233+
3234+
#[tokio::test]
3235+
async fn test_v0_7_5_migration() {
3236+
// We migrate to add Fragment.physical_rows and DeletionFile.num_deletions
3237+
// after this version.
3238+
3239+
// Copy over table
3240+
let test_dir = copy_test_data_to_tmp("v0.7.5/with_deletions").unwrap();
3241+
let test_uri = test_dir.path().to_str().unwrap();
3242+
3243+
// Assert num rows, deletions, and physical rows are all correct.
3244+
let dataset = Dataset::open(test_uri).await.unwrap();
3245+
assert_eq!(dataset.count_rows().await.unwrap(), 90);
3246+
assert_eq!(dataset.count_deleted_rows().await.unwrap(), 10);
3247+
let total_physical_rows = futures::stream::iter(dataset.get_fragments())
3248+
.then(|f| async move { f.physical_rows().await })
3249+
.try_fold(0, |acc, x| async move { Ok(acc + x) })
3250+
.await
3251+
.unwrap();
3252+
assert_eq!(total_physical_rows, 100);
3253+
3254+
// Append 5 rows
3255+
let schema = Arc::new(ArrowSchema::from(dataset.schema()));
3256+
let batch = RecordBatch::try_new(
3257+
schema.clone(),
3258+
vec![Arc::new(Int64Array::from_iter_values(100..105))],
3259+
)
3260+
.unwrap();
3261+
let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
3262+
let write_params = WriteParams {
3263+
mode: WriteMode::Append,
3264+
..Default::default()
3265+
};
3266+
let dataset = Dataset::write(batches, test_uri, Some(write_params))
3267+
.await
3268+
.unwrap();
3269+
3270+
// Assert num rows, deletions, and physical rows are all correct.
3271+
assert_eq!(dataset.count_rows().await.unwrap(), 95);
3272+
assert_eq!(dataset.count_deleted_rows().await.unwrap(), 10);
3273+
let total_physical_rows = futures::stream::iter(dataset.get_fragments())
3274+
.then(|f| async move { f.physical_rows().await })
3275+
.try_fold(0, |acc, x| async move { Ok(acc + x) })
3276+
.await
3277+
.unwrap();
3278+
assert_eq!(total_physical_rows, 105);
3279+
3280+
dataset.validate().await.unwrap();
3281+
3282+
// Scan data and assert it is as expected.
3283+
let expected = RecordBatch::try_new(
3284+
schema.clone(),
3285+
vec![Arc::new(Int64Array::from_iter_values(
3286+
(0..10).chain(20..105),
3287+
))],
3288+
)
3289+
.unwrap();
3290+
let actual_batches = dataset
3291+
.scan()
3292+
.try_into_stream()
3293+
.await
3294+
.unwrap()
3295+
.try_collect::<Vec<_>>()
3296+
.await
3297+
.unwrap();
3298+
let actual = concat_batches(&actual_batches[0].schema(), &actual_batches).unwrap();
3299+
assert_eq!(actual, expected);
3300+
}
3301+
3302+
#[tokio::test]
3303+
async fn test_fix_v0_8_0_broken_migration() {
3304+
// The migration from v0.7.5 was broken in 0.8.0. This validates we can
3305+
// automatically fix tables that have this problem.
3306+
3307+
// Copy over table
3308+
let test_dir = copy_test_data_to_tmp("v0.8.0/migrated_from_v0.7.5").unwrap();
3309+
let test_uri = test_dir.path().to_str().unwrap();
3310+
3311+
// Assert num rows, deletions, and physical rows are all correct, even
3312+
// though stats are bad.
3313+
let dataset = Dataset::open(test_uri).await.unwrap();
3314+
assert_eq!(dataset.count_rows().await.unwrap(), 92);
3315+
assert_eq!(dataset.count_deleted_rows().await.unwrap(), 10);
3316+
let total_physical_rows = futures::stream::iter(dataset.get_fragments())
3317+
.then(|f| async move { f.physical_rows().await })
3318+
.try_fold(0, |acc, x| async move { Ok(acc + x) })
3319+
.await
3320+
.unwrap();
3321+
assert_eq!(total_physical_rows, 102);
3322+
3323+
// Append 5 rows to table.
3324+
let schema = Arc::new(ArrowSchema::from(dataset.schema()));
3325+
let batch = RecordBatch::try_new(
3326+
schema.clone(),
3327+
vec![Arc::new(Int64Array::from_iter_values(100..105))],
3328+
)
3329+
.unwrap();
3330+
let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
3331+
let write_params = WriteParams {
3332+
mode: WriteMode::Append,
3333+
..Default::default()
3334+
};
3335+
let dataset = Dataset::write(batches, test_uri, Some(write_params))
3336+
.await
3337+
.unwrap();
3338+
3339+
// Assert statistics are all now correct.
3340+
let physical_rows: Vec<_> = dataset
3341+
.get_fragments()
3342+
.iter()
3343+
.map(|f| f.metadata.physical_rows)
3344+
.collect();
3345+
assert_eq!(physical_rows, vec![Some(100), Some(2), Some(5)]);
3346+
let num_deletions: Vec<_> = dataset
3347+
.get_fragments()
3348+
.iter()
3349+
.map(|f| {
3350+
f.metadata
3351+
.deletion_file
3352+
.as_ref()
3353+
.and_then(|df| df.num_deleted_rows)
3354+
})
3355+
.collect();
3356+
assert_eq!(num_deletions, vec![Some(10), None, None]);
3357+
assert_eq!(dataset.count_rows().await.unwrap(), 97);
3358+
3359+
// Scan data and assert it is as expected.
3360+
let expected = RecordBatch::try_new(
3361+
schema.clone(),
3362+
vec![Arc::new(Int64Array::from_iter_values(
3363+
(0..10).chain(20..100).chain(0..2).chain(100..105),
3364+
))],
3365+
)
3366+
.unwrap();
3367+
let actual_batches = dataset
3368+
.scan()
3369+
.try_into_stream()
3370+
.await
3371+
.unwrap()
3372+
.try_collect::<Vec<_>>()
3373+
.await
3374+
.unwrap();
3375+
let actual = concat_batches(&actual_batches[0].schema(), &actual_batches).unwrap();
3376+
assert_eq!(actual, expected);
3377+
}
31913378
}

0 commit comments

Comments
 (0)