Skip to content

Commit 0429aa7

Browse files
authored
feat: change dataset uri to return full qualified url instead of object store path (lancedb#2416)
1 parent 7153ddc commit 0429aa7

File tree

10 files changed

+115
-61
lines changed

10 files changed

+115
-61
lines changed

Cargo.toml

+16-16
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ exclude = ["python"]
2020
resolver = "2"
2121

2222
[workspace.package]
23-
version = "0.11.2"
23+
version = "0.12.0"
2424
edition = "2021"
2525
authors = ["Lance Devs <dev@lancedb.com>"]
2626
license = "Apache-2.0"
@@ -43,20 +43,20 @@ categories = [
4343
rust-version = "1.75"
4444

4545
[workspace.dependencies]
46-
lance = { version = "=0.11.2", path = "./rust/lance" }
47-
lance-arrow = { version = "=0.11.2", path = "./rust/lance-arrow" }
48-
lance-core = { version = "=0.11.2", path = "./rust/lance-core" }
49-
lance-datafusion = { version = "=0.11.2", path = "./rust/lance-datafusion" }
50-
lance-datagen = { version = "=0.11.2", path = "./rust/lance-datagen" }
51-
lance-encoding = { version = "=0.11.2", path = "./rust/lance-encoding" }
52-
lance-encoding-datafusion = { version = "=0.11.2", path = "./rust/lance-encoding-datafusion" }
53-
lance-file = { version = "=0.11.2", path = "./rust/lance-file" }
54-
lance-index = { version = "=0.11.2", path = "./rust/lance-index" }
55-
lance-io = { version = "=0.11.2", path = "./rust/lance-io" }
56-
lance-linalg = { version = "=0.11.2", path = "./rust/lance-linalg" }
57-
lance-table = { version = "=0.11.2", path = "./rust/lance-table" }
58-
lance-test-macros = { version = "=0.11.2", path = "./rust/lance-test-macros" }
59-
lance-testing = { version = "=0.11.2", path = "./rust/lance-testing" }
46+
lance = { version = "=0.12.0", path = "./rust/lance" }
47+
lance-arrow = { version = "=0.12.0", path = "./rust/lance-arrow" }
48+
lance-core = { version = "=0.12.0", path = "./rust/lance-core" }
49+
lance-datafusion = { version = "=0.12.0", path = "./rust/lance-datafusion" }
50+
lance-datagen = { version = "=0.12.0", path = "./rust/lance-datagen" }
51+
lance-encoding = { version = "=0.12.0", path = "./rust/lance-encoding" }
52+
lance-encoding-datafusion = { version = "=0.12.0", path = "./rust/lance-encoding-datafusion" }
53+
lance-file = { version = "=0.12.0", path = "./rust/lance-file" }
54+
lance-index = { version = "=0.12.0", path = "./rust/lance-index" }
55+
lance-io = { version = "=0.12.0", path = "./rust/lance-io" }
56+
lance-linalg = { version = "=0.12.0", path = "./rust/lance-linalg" }
57+
lance-table = { version = "=0.12.0", path = "./rust/lance-table" }
58+
lance-test-macros = { version = "=0.12.0", path = "./rust/lance-test-macros" }
59+
lance-testing = { version = "=0.12.0", path = "./rust/lance-testing" }
6060
approx = "0.5.1"
6161
# Note that this one does not include pyarrow
6262
arrow = { version = "51.0.0", optional = false, features = ["prettyprint"] }
@@ -130,7 +130,7 @@ rustc_version = "0.4"
130130
serde = { version = "^1" }
131131
serde_json = { version = "1" }
132132
shellexpand = "3.0"
133-
snafu = "0.7.4"
133+
snafu = "0.7.5"
134134
tempfile = "3"
135135
test-log = { version = "0.2.15" }
136136
tokio = { version = "1.23", features = [

python/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pylance"
3-
version = "0.11.2"
3+
version = "0.12.0"
44
edition = "2021"
55
authors = ["Lance Devs <dev@lancedb.com>"]
66
rust-version = "1.65"

python/src/fragment.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ impl FragmentMetadata {
432432

433433
#[pyfunction(name = "_cleanup_partial_writes")]
434434
pub fn cleanup_partial_writes(base_uri: &str, files: Vec<(String, String)>) -> PyResult<()> {
435-
let (store, _) = RT
435+
let (store, base_path) = RT
436436
.runtime
437437
.block_on(ObjectStore::from_uri(base_uri))
438438
.map_err(|err| PyIOError::new_err(format!("Failed to create object store: {}", err)))?;
@@ -443,15 +443,19 @@ pub fn cleanup_partial_writes(base_uri: &str, files: Vec<(String, String)>) -> P
443443
.collect();
444444

445445
#[allow(clippy::map_identity)]
446-
async fn inner(store: ObjectStore, files: Vec<(Path, String)>) -> Result<(), ::lance::Error> {
446+
async fn inner(
447+
store: ObjectStore,
448+
base_path: Path,
449+
files: Vec<(Path, String)>,
450+
) -> Result<(), ::lance::Error> {
447451
let files_iter = files
448452
.iter()
449453
.map(|(path, multipart_id)| (path, multipart_id));
450-
lance::dataset::cleanup::cleanup_partial_writes(&store, files_iter).await
454+
lance::dataset::cleanup::cleanup_partial_writes(&store, &base_path, files_iter).await
451455
}
452456

453457
RT.runtime
454-
.block_on(inner(store, files))
458+
.block_on(inner(store, base_path, files))
455459
.map_err(|err| PyIOError::new_err(format!("Failed to cleanup files: {}", err)))
456460
}
457461

rust/lance-index/src/vector/hnsw/builder.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -766,6 +766,7 @@ mod tests {
766766
use lance_linalg::distance::DistanceType;
767767
use lance_table::format::SelfDescribingFileReader;
768768
use lance_testing::datagen::generate_random_array;
769+
use object_store::path::Path;
769770

770771
use crate::vector::{
771772
flat::storage::FlatStorage,
@@ -792,7 +793,7 @@ mod tests {
792793
.unwrap();
793794

794795
let object_store = ObjectStore::memory();
795-
let path = object_store.base_path().child("test_builder_write_load");
796+
let path = Path::from("test_builder_write_load");
796797
let writer = object_store.create(&path).await.unwrap();
797798
let schema = Schema::new(vec![
798799
VECTOR_ID_FIELD.clone(),

rust/lance-io/src/object_store.rs

+25-22
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ pub struct ObjectStore {
8484
// Inner object store
8585
pub inner: Arc<dyn OSObjectStore>,
8686
scheme: String,
87-
base_path: Path,
8887
block_size: usize,
8988
}
9089

@@ -94,9 +93,7 @@ impl DeepSizeOf for ObjectStore {
9493
// shouldn't be too big. The only exception might be the write cache but, if
9594
// the writer cache has data, it means we're using it somewhere else that isn't
9695
// a cache and so that doesn't really count.
97-
self.scheme.deep_size_of_children(context)
98-
+ self.base_path.as_ref().deep_size_of_children(context)
99-
+ self.block_size.deep_size_of_children(context)
96+
self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
10097
}
10198
}
10299

@@ -356,15 +353,14 @@ impl ObjectStore {
356353
uri: &str,
357354
params: &ObjectStoreParams,
358355
) -> Result<(Self, Path)> {
359-
let (object_store, base_path) = match Url::parse(uri) {
356+
let (object_store, path) = match Url::parse(uri) {
360357
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
361358
// On Windows, the drive is parsed as a scheme
362359
Self::from_path(uri)
363360
}
364361
Ok(url) => {
365362
let store = Self::new_from_url(url.clone(), params.clone()).await?;
366-
let path = Path::from(url.path());
367-
Ok((store, path))
363+
Ok((store, Path::from(url.path())))
368364
}
369365
Err(_) => Self::from_path(uri),
370366
}?;
@@ -378,7 +374,7 @@ impl ObjectStore {
378374
.unwrap_or(object_store.inner),
379375
..object_store
380376
},
381-
base_path,
377+
path,
382378
))
383379
}
384380

@@ -399,7 +395,6 @@ impl ObjectStore {
399395
Self {
400396
inner: Arc::new(LocalFileSystem::new()).traced(),
401397
scheme: String::from(scheme),
402-
base_path: Path::from_absolute_path(expanded_path.as_path())?,
403398
block_size: 4 * 1024, // 4KB block size
404399
},
405400
Path::from_absolute_path(expanded_path.as_path())?,
@@ -419,7 +414,6 @@ impl ObjectStore {
419414
Self {
420415
inner: Arc::new(LocalFileSystem::new()).traced(),
421416
scheme: String::from("file"),
422-
base_path: Path::from("/"),
423417
block_size: 4 * 1024, // 4KB block size
424418
}
425419
}
@@ -429,7 +423,6 @@ impl ObjectStore {
429423
Self {
430424
inner: Arc::new(InMemory::new()).traced(),
431425
scheme: String::from("memory"),
432-
base_path: Path::from("/"),
433426
block_size: 64 * 1024,
434427
}
435428
}
@@ -447,10 +440,6 @@ impl ObjectStore {
447440
self.block_size = new_size;
448441
}
449442

450-
pub fn base_path(&self) -> &Path {
451-
&self.base_path
452-
}
453-
454443
/// Open a file for path.
455444
///
456445
/// Parameters
@@ -747,11 +736,9 @@ async fn configure_store(url: &str, options: ObjectStoreParams) -> Result<Object
747736
Ok(ObjectStore {
748737
inner: Arc::new(store),
749738
scheme: String::from(url.scheme()),
750-
base_path: Path::from(url.path()),
751739
block_size: 64 * 1024,
752740
})
753741
}
754-
755742
"gs" => {
756743
storage_options.with_env_gcs();
757744
let mut builder = GoogleCloudStorageBuilder::new().with_url(url.as_ref());
@@ -763,23 +750,21 @@ async fn configure_store(url: &str, options: ObjectStoreParams) -> Result<Object
763750
// object_store 0.10.0 is available.
764751
let store = PatchedGoogleCloudStorage(Arc::new(store));
765752
let store = Arc::new(store);
753+
766754
Ok(ObjectStore {
767755
inner: store,
768756
scheme: String::from("gs"),
769-
base_path: Path::from(url.path()),
770757
block_size: 64 * 1024,
771758
})
772759
}
773760
"az" => {
774761
storage_options.with_env_azure();
775-
776762
let (store, _) = parse_url_opts(&url, storage_options.as_azure_options())?;
777763
let store = Arc::new(store);
778764

779765
Ok(ObjectStore {
780766
inner: store,
781767
scheme: String::from("az"),
782-
base_path: Path::from(url.path()),
783768
block_size: 64 * 1024,
784769
})
785770
}
@@ -794,7 +779,6 @@ async fn configure_store(url: &str, options: ObjectStoreParams) -> Result<Object
794779
"memory" => Ok(ObjectStore {
795780
inner: Arc::new(InMemory::new()).traced(),
796781
scheme: String::from("memory"),
797-
base_path: Path::from(url.path()),
798782
block_size: 64 * 1024,
799783
}),
800784
unknow_scheme => {
@@ -824,7 +808,6 @@ impl ObjectStore {
824808
Self {
825809
inner: store,
826810
scheme: scheme.into(),
827-
base_path: location.path().into(),
828811
block_size,
829812
}
830813
}
@@ -969,6 +952,26 @@ mod tests {
969952
}
970953
}
971954

955+
#[tokio::test]
956+
async fn test_cloud_paths() {
957+
let uri = "s3://bucket/foo.lance";
958+
let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
959+
assert_eq!(store.scheme, "s3");
960+
assert_eq!(path.to_string(), "foo.lance");
961+
962+
let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
963+
.await
964+
.unwrap();
965+
assert_eq!(store.scheme, "s3");
966+
assert_eq!(path.to_string(), "foo.lance");
967+
968+
let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
969+
.await
970+
.unwrap();
971+
assert_eq!(store.scheme, "gs");
972+
assert_eq!(path.to_string(), "foo.lance");
973+
}
974+
972975
#[tokio::test]
973976
async fn test_relative_paths() {
974977
let tmp_dir = tempfile::tempdir().unwrap();

rust/lance/src/dataset.rs

+47-3
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ pub(crate) const DEFAULT_METADATA_CACHE_SIZE: usize = 256;
8484
pub struct Dataset {
8585
pub(crate) object_store: Arc<ObjectStore>,
8686
pub(crate) commit_handler: Arc<dyn CommitHandler>,
87+
/// Uri of the dataset.
88+
///
89+
/// On cloud storage, we can not use [Dataset::base] to build the full uri because the
90+
/// `bucket` is swlloed in the inner [ObjectStore].
91+
uri: String,
8792
pub(crate) base: Path,
8893
pub(crate) manifest: Arc<Manifest>,
8994
pub(crate) session: Arc<Session>,
@@ -282,6 +287,7 @@ impl Dataset {
282287
Self::checkout_manifest(
283288
self.object_store.clone(),
284289
base_path,
290+
self.uri.to_string(),
285291
&manifest_location,
286292
self.session.clone(),
287293
self.commit_handler.clone(),
@@ -292,6 +298,7 @@ impl Dataset {
292298
async fn checkout_manifest(
293299
object_store: Arc<ObjectStore>,
294300
base_path: Path,
301+
uri: String,
295302
manifest_location: &ManifestLocation,
296303
session: Arc<Session>,
297304
commit_handler: Arc<dyn CommitHandler>,
@@ -354,6 +361,7 @@ impl Dataset {
354361
Ok(Self {
355362
object_store,
356363
base: base_path,
364+
uri,
357365
manifest: Arc::new(manifest),
358366
commit_handler,
359367
session,
@@ -495,6 +503,7 @@ impl Dataset {
495503
Ok(Self {
496504
object_store,
497505
base,
506+
uri: uri.to_string(),
498507
manifest: Arc::new(manifest.clone()),
499508
session: Arc::new(Session::default()),
500509
commit_handler,
@@ -590,9 +599,9 @@ impl Dataset {
590599
self.append_impl(batches, params).await
591600
}
592601

593-
/// Get the base URI of the dataset.
594-
pub fn uri(&self) -> &Path {
595-
&self.base
602+
/// Get the fully qualified URI of this dataset.
603+
pub fn uri(&self) -> &str {
604+
&self.uri
596605
}
597606

598607
/// Get the full manifest of the dataset version.
@@ -789,6 +798,7 @@ impl Dataset {
789798
Ok(Self {
790799
object_store: Arc::new(object_store),
791800
base,
801+
uri: base_uri.to_string(),
792802
manifest: Arc::new(manifest.clone()),
793803
session: Arc::new(Session::default()),
794804
commit_handler,
@@ -3378,4 +3388,38 @@ mod tests {
33783388

33793389
assert!(!dataset_dir.exists());
33803390
}
3391+
3392+
#[tokio::test]
3393+
async fn test_dataset_uri_roundtrips() {
3394+
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
3395+
"a",
3396+
DataType::Int32,
3397+
false,
3398+
)]));
3399+
3400+
let test_dir = tempdir().unwrap();
3401+
let test_uri = test_dir.path().to_str().unwrap();
3402+
let vectors = Arc::new(Int32Array::from_iter_values(vec![]));
3403+
3404+
let data = RecordBatch::try_new(schema.clone(), vec![vectors]);
3405+
let reader = RecordBatchIterator::new(vec![data.unwrap()].into_iter().map(Ok), schema);
3406+
let dataset = Dataset::write(
3407+
reader,
3408+
test_uri,
3409+
Some(WriteParams {
3410+
..Default::default()
3411+
}),
3412+
)
3413+
.await
3414+
.unwrap();
3415+
3416+
let uri = dataset.uri();
3417+
assert_eq!(uri, test_uri);
3418+
3419+
let ds2 = Dataset::open(uri).await.unwrap();
3420+
assert_eq!(
3421+
ds2.latest_version_id().await.unwrap(),
3422+
dataset.latest_version_id().await.unwrap()
3423+
);
3424+
}
33813425
}

0 commit comments

Comments
 (0)