Skip to content

Commit

Permalink
feat(session): Replaces direct usage of DaftCatalog with Session (#3794)
Browse files Browse the repository at this point in the history
  • Loading branch information
rchowell authored Feb 13, 2025
1 parent 1ae9605 commit f9a4b70
Show file tree
Hide file tree
Showing 22 changed files with 420 additions and 236 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 10 additions & 9 deletions daft/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ class Identifier(Sequence):
"""A reference (path) to a catalog object.
Example:
>>> id1 = Identifier("a", "b")
>>> id2 = Identifier.parse("a.b")
>>> id = Identifier("a", "b")
>>> assert len(id) == 2
"""

_identifier: native_catalog.PyIdentifier
Expand All @@ -174,8 +174,8 @@ def __init__(self, *parts: str):
"""Creates an Identifier from its parts.
Example:
>>> id = Identifier("schema", "table")
>>> id # Identifier('schema.table')
>>> Identifier("schema", "table")
>>> #
Returns:
Identifier: A new identifier.
Expand All @@ -185,18 +185,19 @@ def __init__(self, *parts: str):
self._identifier = native_catalog.PyIdentifier(parts[:-1], parts[-1])

@staticmethod
def parse(input: str) -> Identifier:
"""Parses an Identifier from an SQL string.
def from_sql(input: str, normalize: bool = False) -> Identifier:
"""Parses an Identifier from an SQL string, normalizing to lowercase if specified.
Example:
>>> id = Identifier.parse("schema.table")
>>> assert len(id) == 2
>>> Identifier.from_sql("schema.table") == Identifier("schema", "table")
>>> Identifier.from_sql('"a.b"') == Identifier('"a.b."')
>>> Identifier.from_sql('ABC."xYz"', normalize=True) == Identifier("abc", "xYz")
Returns:
Identifier: A new identifier.
"""
i = Identifier.__new__(Identifier)
i._identifier = native_catalog.PyIdentifier.parse(input)
i._identifier = native_catalog.PyIdentifier.from_sql(input, normalize)
return i

def __eq__(self, other: object) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion daft/daft/catalog.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ if TYPE_CHECKING:
class PyIdentifier:
def __init__(self, namespace: tuple[str, ...], name: str): ...
@staticmethod
def parse(input: str): ...
def from_sql(input: str, normalize: bool): ...
def eq(self, other: PyIdentifier) -> bool: ...
def getitem(self, index: int) -> str: ...
def __len__(self) -> int: ...
Expand Down
43 changes: 43 additions & 0 deletions src/daft-catalog/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::{collections::HashMap, sync::Arc};

use crate::DataCatalog;

/// Catalogs is a collection of referenceable catalogs (glorified map).
///
/// Notes:
/// - This is intentionally lightweight and everything is exact-case.
/// - All APIs are non-fallible because callers determine what is an error.
/// - It does not necessarily have map semantics.
/// - Callers are responsible for case-normalization hence String, &str.
/// - Intentionally using String and &str rather than Into and AsRef.
#[derive(Debug)]
pub struct Catalogs(HashMap<String, Arc<dyn DataCatalog>>);

impl Catalogs {
/// Creates an empty catalogs collection
pub fn empty() -> Catalogs {
Self(HashMap::new())
}

/// Attaches a catalog to this catalog collection.
pub fn attach(&mut self, name: String, catalog: Arc<dyn DataCatalog>) {
self.0.insert(name, catalog);
}

/// Detaches a catalog from this catalog collection.
pub fn detach(&mut self, name: &str) {
self.0.remove(name);
}

/// Get the catalog by name.
pub fn get(&self, name: &str) -> Option<Arc<dyn DataCatalog>> {
self.0.get(name).map(Arc::clone)
}

/// Returns true iff a catalog with the given name exists (exact-case).
pub fn exists(&self, name: &str) -> bool {
self.0.contains_key(name)
}
}

// TODO Catalog trait for implementations.
46 changes: 33 additions & 13 deletions src/daft-catalog/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use snafu::Snafu;

use crate::Identifier;

/// Catalog Result
pub type Result<T, E = Error> = std::result::Result<T, E>;

/// Catalog Error
#[derive(Debug, Snafu)]
pub enum Error {
// TODO remove me
#[snafu(display(
"Failed to find specified table identifier {} in the requested catalog {}",
catalog_name,
Expand All @@ -14,10 +19,13 @@ pub enum Error {
table_id: String,
},

#[snafu(display("Catalog not found: {}", name))]
CatalogNotFound { name: String },
#[snafu(display("{type_} with name {ident} already exists!"))]
ObjectAlreadyExists { type_: String, ident: String },

#[snafu(display("{type_} with name {ident} not found!"))]
ObjectNotFound { type_: String, ident: String },

#[snafu(display("Invalid identifier `{input}`."))]
#[snafu(display("Invalid identifier {input}!"))]
InvalidIdentifier { input: String },

#[snafu(display("{message}"))]
Expand All @@ -33,21 +41,33 @@ pub enum Error {

impl Error {
#[inline]
pub fn unsupported(message: String) -> Error {
Error::Unsupported { message }
pub fn unsupported<S: Into<String>>(message: S) -> Error {
Error::Unsupported {
message: message.into(),
}
}

#[inline]
pub fn obj_already_exists<S: Into<String>>(type_: S, ident: &Identifier) -> Error {
Error::ObjectAlreadyExists {
type_: type_.into(),
ident: ident.to_string(),
}
}

// Consider typed arguments vs strings for consistent formatting.
#[inline]
pub fn obj_not_found<S: Into<String>>(typ_: S, ident: &Identifier) -> Error {
Error::ObjectNotFound {
type_: typ_.into(),
ident: ident.to_string(),
}
}
}

impl From<Error> for common_error::DaftError {
fn from(err: Error) -> Self {
match &err {
Error::TableNotFound { .. }
| Error::CatalogNotFound { .. }
| Error::InvalidIdentifier { .. }
| Error::Unsupported { .. } => common_error::DaftError::CatalogError(err.to_string()),
#[cfg(feature = "python")]
Error::PythonError { .. } => common_error::DaftError::CatalogError(err.to_string()),
}
common_error::DaftError::CatalogError(err.to_string())
}
}

Expand Down
87 changes: 48 additions & 39 deletions src/daft-catalog/src/identifier.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::{fmt::Display, iter::once, vec::IntoIter};
use std::fmt::Display;

use crate::error::{Error, Result};

/// An object's namespace (location).
/// A reference qualifier.
pub type Namespace = Vec<String>;

/// A reference (path) to some catalog object.
/// A reference to a catalog object.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Identifier {
pub namespace: Namespace,
Expand All @@ -18,13 +18,21 @@ impl Identifier {
Self { namespace, name }
}

/// Returns true if this is a qualified identifier e.g. has a namespace.
/// Returns a new simple identifier (no namespace)
pub fn simple<T: Into<String>>(name: T) -> Self {
Self {
namespace: vec![],
name: name.into(),
}
}

/// Returns true if this is a qualified identifier.
pub fn has_namespace(&self) -> bool {
!self.namespace.is_empty()
}

/// Parses an identifier using sqlparser to validate the input.
pub fn parse(input: &str) -> Result<Identifier> {
pub fn from_sql(input: &str, normalize: bool) -> Result<Identifier> {
// TODO daft should define its own identifier domain.
use sqlparser::{dialect::PostgreSqlDialect, parser::Parser};
let err = Error::InvalidIdentifier {
Expand All @@ -33,58 +41,59 @@ impl Identifier {
let Ok(mut parser) = Parser::new(&PostgreSqlDialect {}).try_with_sql(input) else {
return Err(err);
};
let Ok(parts) = parser.parse_multipart_identifier() else {
let Ok(idents) = parser.parse_multipart_identifier() else {
return Err(err);
};
if parts.is_empty() {
if idents.is_empty() {
return Err(err);
}
// TODO Identifier normalization is omitted until further discussion.
let mut parts = parts
.iter()
.map(|part| part.value.to_string())
.collect::<Vec<String>>();
let name = parts.pop().unwrap();
let namespace = parts;
// Convert sqlparser identifiers applying normalization if given.
let mut names: Vec<String> = vec![];
for ident in idents {
if normalize && ident.quote_style.is_none() {
names.push(ident.value.to_lowercase());
} else {
names.push(ident.value);
}
}
let name = names.pop().unwrap();
let namespace = names;
Ok(Identifier::new(namespace, name))
}
}

impl Display for Identifier {
/// Joins the identifier to a dot-delimited path.
/// Returns the identifier as a dot-delimited string with double-quoted parts.
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.namespace.is_empty() {
f.write_str(&self.name)
write!(f, "{}", escape_double_quotes(&self.name))
} else {
let prefix = self.namespace.join(".");
let string = format!("{}.{}", prefix, self.name);
f.write_str(&string)
let namespace = self
.namespace
.iter()
.map(|n| escape_double_quotes(n))
.collect::<Vec<String>>()
.join(".");
let name = escape_double_quotes(&self.name);
write!(f, "{}.{}", namespace, name)
}
}
}

impl IntoIterator for Identifier {
type Item = String;
type IntoIter = IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
self.namespace
.into_iter()
.chain(once(self.name))
.collect::<Vec<String>>()
.into_iter()
impl From<String> for Identifier {
/// Returns an unqualified delimited identifier.
fn from(name: String) -> Self {
Self::simple(name)
}
}

impl<'a> IntoIterator for &'a Identifier {
type Item = &'a String;
type IntoIter = IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
self.namespace
.iter()
.chain(once(&self.name))
.collect::<Vec<&String>>()
.into_iter()
impl From<&str> for Identifier {
/// Returns an unqualified delmited identifier.
fn from(name: &str) -> Self {
Self::simple(name.to_string())
}
}

fn escape_double_quotes(text: &str) -> String {
text.replace('"', "\"\"")
}
28 changes: 13 additions & 15 deletions src/daft-catalog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
pub mod catalog;
mod data_catalog;
mod data_catalog_table;
pub mod error;
pub mod identifier;

mod identifier;
// Export public-facing traits
use std::{collections::HashMap, default, sync::Arc};

Expand All @@ -14,6 +14,7 @@ pub use data_catalog_table::DataCatalogTable;
pub mod python;

use error::{Error, Result};
pub use identifier::*;

pub mod global_catalog {
use std::sync::{Arc, RwLock};
Expand Down Expand Up @@ -43,7 +44,6 @@ pub mod global_catalog {
.unregister_catalog(name)
}
}
use identifier::Identifier;

/// Name of the default catalog
static DEFAULT_CATALOG_NAME: &str = "default";
Expand Down Expand Up @@ -103,13 +103,8 @@ impl DaftCatalog {
name: &str,
view: impl Into<LogicalPlanBuilder>,
) -> Result<()> {
let identifier = Identifier::parse(name)?;
if identifier.has_namespace() {
return Err(Error::Unsupported {
message: format!("Qualified identifiers are not yet supported. Instead use a single identifier, or wrap your table name in quotes such as `\"{}\"`", name),
});
}
self.named_tables.insert(identifier.name, view.into());
// TODO this API is being removed, for now preserve the exact name as if it were delimited.
self.named_tables.insert(name.into(), view.into());
Ok(())
}

Expand Down Expand Up @@ -159,6 +154,7 @@ impl DaftCatalog {
table_id: searched_table_name.to_string(),
})
}

/// Copy from another catalog, using tables from other in case of conflict
pub fn copy_from(&mut self, other: &Self) {
for (name, plan) in &other.named_tables {
Expand All @@ -168,6 +164,13 @@ impl DaftCatalog {
self.data_catalogs.insert(name.clone(), catalog.clone());
}
}

/// TODO remove py register and read methods are moved to session
/// I cannot remove DaftMetaCatalog until I invert the dependency
/// so that the current register_ methods use the session rather than the catalog.
pub fn into_catalog_map(self) -> HashMap<String, Arc<dyn DataCatalog>> {
self.data_catalogs
}
}

#[cfg(test)]
Expand Down Expand Up @@ -208,11 +211,6 @@ mod tests {

// Register a table
assert!(catalog.register_table("test_table", plan.clone()).is_ok());

// Try to register a table with invalid name
assert!(catalog
.register_table("invalid name", plan.clone())
.is_err());
}

#[test]
Expand Down
Loading

0 comments on commit f9a4b70

Please sign in to comment.