Skip to content

Commit

Permalink
Add host to ObjectStoreRegistry
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jun 30, 2022
1 parent ace05fd commit b6c069b
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 23 deletions.
3 changes: 0 additions & 3 deletions datafusion/core/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ mod tests {
use std::sync::Arc;

use arrow::datatypes::Schema;
use object_store::local::LocalFileSystem;

use crate::assert_batches_eq;
use crate::catalog::catalog::{CatalogProvider, MemoryCatalogProvider};
Expand Down Expand Up @@ -170,8 +169,6 @@ mod tests {
let schema = MemorySchemaProvider::new();

let ctx = SessionContext::new();
let store = Arc::new(LocalFileSystem::new());
ctx.runtime_env().register_object_store("file", store);

let config = ListingTableConfig::new(table_path)
.infer(&ctx.state())
Expand Down
29 changes: 13 additions & 16 deletions datafusion/core/src/datasource/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl std::fmt::Display for ObjectStoreUrl {
/// Object store registry
pub struct ObjectStoreRegistry {
/// A map from scheme to object store that serve list / read operations for the store
pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
}

impl std::fmt::Debug for ObjectStoreRegistry {
Expand All @@ -109,8 +109,7 @@ impl ObjectStoreRegistry {
/// ['LocalFileSystem'] store is registered in by default to support read local files natively.
pub fn new() -> Self {
let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
map.insert("file".to_string(), Arc::new(LocalFileSystem::new()));

map.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
Self {
object_stores: RwLock::new(map),
}
Expand All @@ -120,34 +119,32 @@ impl ObjectStoreRegistry {
/// If a store of the same prefix existed before, it is replaced in the registry and returned.
pub fn register_store(
&self,
scheme: String,
scheme: impl AsRef<str>,
host: impl AsRef<str>,
store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
let mut stores = self.object_stores.write();
stores.insert(scheme, store)
}

/// Get the store registered for scheme
pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
let stores = self.object_stores.read();
stores.get(scheme).cloned()
let s = format!("{}://{}", scheme.as_ref(), host.as_ref());
stores.insert(s, store)
}

/// Get a suitable store for the provided URL. For example:
///
/// - URL with scheme `file://` or no schema will return the default LocalFS store
/// - URL with scheme `s3://` will return the S3 store if it's registered
/// - URL with scheme `file:///` or no schema will return the default LocalFS store
/// - URL with scheme `s3://bucket/` will return the S3 store if it's registered
///
pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
let url = url.as_ref();
let store = self.get(url.scheme()).ok_or_else(|| {
let s = &url[url::Position::BeforeScheme..url::Position::AfterHost];
let stores = self.object_stores.read();
let store = stores.get(s).ok_or_else(|| {
DataFusionError::Internal(format!(
"No suitable object store found for {}",
url
))
})?;

Ok(store)
Ok(store.clone())
}
}

Expand Down Expand Up @@ -196,7 +193,7 @@ mod tests {
#[test]
fn test_get_by_url_s3() {
let sut = ObjectStoreRegistry::default();
sut.register_store("s3".to_string(), Arc::new(LocalFileSystem::new()));
sut.register_store("s3", "bucket", Arc::new(LocalFileSystem::new()));
let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
sut.get_by_url(&url).unwrap();
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/execution/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ impl RuntimeEnv {
/// Returns the `ObjectStore` previously registered for this scheme, if any
pub fn register_object_store(
&self,
scheme: impl Into<String>,
scheme: impl AsRef<str>,
host: impl AsRef<str>,
object_store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
let scheme = scheme.into();
self.object_store_registry
.register_store(scheme, object_store)
.register_store(scheme, host, object_store)
}

/// Retrieves a `ObjectStore` instance for a url
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/test/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;
/// Returns a test object store with the provided `ctx`
pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
ctx.runtime_env()
.register_object_store("test", make_test_store(files));
.register_object_store("test", "", make_test_store(files));
}

/// Create a test object store with the provided files
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/tests/path_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ fn register_partitioned_aggregate_csv(
let file_schema = test_util::aggr_test_schema();
ctx.runtime_env().register_object_store(
"mirror",
"",
MirroringObjectStore::new_arc(csv_file_path, store_paths),
);

Expand Down Expand Up @@ -444,6 +445,7 @@ async fn register_partitioned_alltypes_parquet(
let parquet_file_path = format!("{}/{}", testdata, source_file);
ctx.runtime_env().register_object_store(
"mirror",
"",
MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths),
);

Expand Down

0 comments on commit b6c069b

Please sign in to comment.