diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs index 4ed9741a536d..7634328f323e 100644 --- a/datafusion/core/src/catalog/schema.rs +++ b/datafusion/core/src/catalog/schema.rs @@ -132,7 +132,6 @@ mod tests { use std::sync::Arc; use arrow::datatypes::Schema; - use object_store::local::LocalFileSystem; use crate::assert_batches_eq; use crate::catalog::catalog::{CatalogProvider, MemoryCatalogProvider}; @@ -170,8 +169,6 @@ mod tests { let schema = MemorySchemaProvider::new(); let ctx = SessionContext::new(); - let store = Arc::new(LocalFileSystem::new()); - ctx.runtime_env().register_object_store("file", store); let config = ListingTableConfig::new(table_path) .infer(&ctx.state()) diff --git a/datafusion/core/src/datasource/object_store.rs b/datafusion/core/src/datasource/object_store.rs index a28a1d15beaf..aca5b0ca412e 100644 --- a/datafusion/core/src/datasource/object_store.rs +++ b/datafusion/core/src/datasource/object_store.rs @@ -84,7 +84,7 @@ impl std::fmt::Display for ObjectStoreUrl { /// Object store registry pub struct ObjectStoreRegistry { /// A map from scheme to object store that serve list / read operations for the store - pub object_stores: RwLock>>, + object_stores: RwLock>>, } impl std::fmt::Debug for ObjectStoreRegistry { @@ -109,8 +109,7 @@ impl ObjectStoreRegistry { /// ['LocalFileSystem'] store is registered in by default to support read local files natively. pub fn new() -> Self { let mut map: HashMap> = HashMap::new(); - map.insert("file".to_string(), Arc::new(LocalFileSystem::new())); - + map.insert("file://".to_string(), Arc::new(LocalFileSystem::new())); Self { object_stores: RwLock::new(map), } @@ -120,34 +119,32 @@ impl ObjectStoreRegistry { /// If a store of the same prefix existed before, it is replaced in the registry and returned. pub fn register_store( &self, - scheme: String, + scheme: impl AsRef, + host: impl AsRef, store: Arc, ) -> Option> { let mut stores = self.object_stores.write(); - stores.insert(scheme, store) - } - - /// Get the store registered for scheme - pub fn get(&self, scheme: &str) -> Option> { - let stores = self.object_stores.read(); - stores.get(scheme).cloned() + let s = format!("{}://{}", scheme.as_ref(), host.as_ref()); + stores.insert(s, store) } /// Get a suitable store for the provided URL. For example: /// - /// - URL with scheme `file://` or no schema will return the default LocalFS store - /// - URL with scheme `s3://` will return the S3 store if it's registered + /// - URL with scheme `file:///` or no schema will return the default LocalFS store + /// - URL with scheme `s3://bucket/` will return the S3 store if it's registered /// pub fn get_by_url(&self, url: impl AsRef) -> Result> { let url = url.as_ref(); - let store = self.get(url.scheme()).ok_or_else(|| { + let s = &url[url::Position::BeforeScheme..url::Position::AfterHost]; + let stores = self.object_stores.read(); + let store = stores.get(s).ok_or_else(|| { DataFusionError::Internal(format!( "No suitable object store found for {}", url )) })?; - Ok(store) + Ok(store.clone()) } } @@ -196,7 +193,7 @@ mod tests { #[test] fn test_get_by_url_s3() { let sut = ObjectStoreRegistry::default(); - sut.register_store("s3".to_string(), Arc::new(LocalFileSystem::new())); + sut.register_store("s3", "bucket", Arc::new(LocalFileSystem::new())); let url = ListingTableUrl::parse("s3://bucket/key").unwrap(); sut.get_by_url(&url).unwrap(); } diff --git a/datafusion/core/src/execution/runtime_env.rs b/datafusion/core/src/execution/runtime_env.rs index 35ca4b6a5d09..d810c882fb09 100644 --- a/datafusion/core/src/execution/runtime_env.rs +++ b/datafusion/core/src/execution/runtime_env.rs @@ -92,12 +92,12 @@ impl RuntimeEnv { /// Returns the `ObjectStore` previously registered for this scheme, if any pub fn register_object_store( &self, - scheme: impl Into, + scheme: impl AsRef, + host: impl AsRef, object_store: Arc, ) -> Option> { - let scheme = scheme.into(); self.object_store_registry - .register_store(scheme, object_store) + .register_store(scheme, host, object_store) } /// Retrieves a `ObjectStore` instance for a url diff --git a/datafusion/core/src/test/object_store.rs b/datafusion/core/src/test/object_store.rs index a56d3396e4da..e61653e882a1 100644 --- a/datafusion/core/src/test/object_store.rs +++ b/datafusion/core/src/test/object_store.rs @@ -23,7 +23,7 @@ use std::sync::Arc; /// Returns a test object store with the provided `ctx` pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) { ctx.runtime_env() - .register_object_store("test", make_test_store(files)); + .register_object_store("test", "", make_test_store(files)); } /// Create a test object store with the provided files diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index 42ff28426f73..a88445d781a9 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -417,6 +417,7 @@ fn register_partitioned_aggregate_csv( let file_schema = test_util::aggr_test_schema(); ctx.runtime_env().register_object_store( "mirror", + "", MirroringObjectStore::new_arc(csv_file_path, store_paths), ); @@ -444,6 +445,7 @@ async fn register_partitioned_alltypes_parquet( let parquet_file_path = format!("{}/{}", testdata, source_file); ctx.runtime_env().register_object_store( "mirror", + "", MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths), );