diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala index 4837bbcc6..9cb75c7ae 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala @@ -44,6 +44,10 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces] private var catalogName: String = null + /** + * Gets the delegated catalog of the session + * @return + */ private def getDelegatedCatalog(): T = { val sessionCatalog = delegatedCatalog match { case null => @@ -54,6 +58,17 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces] sessionCatalog.asInstanceOf[T] } + /** + * Gets the session catalog depending on provider properties, if any + * + * The intention is to include the different catalog providers + * while we add the integrations with the formats. + * For example, for "delta" provider it will return a DeltaCatalog instance. + * + * In this way, users may only need to instantiate one single unified catalog. + * @param properties the properties with the provider parameter + * @return + */ private def getSessionCatalog(properties: Map[String, String] = Map.empty): T = { properties.get("provider") match { case Some("delta") => deltaCatalog.asInstanceOf[T] @@ -224,6 +239,7 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces] override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { // Initialize the catalog with the corresponding name this.catalogName = name + // Initialize the catalog in any other provider that we can integrate with this.deltaCatalog.initialize(name, options) } @@ -233,6 +249,7 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces] // Check if the delegating catalog has Table and SupportsNamespace properties if (delegate.isInstanceOf[TableCatalog] && delegate.isInstanceOf[SupportsNamespaces]) { this.delegatedCatalog = delegate + // Set delegated catalog in any other provider that we can integrate with this.deltaCatalog.setDelegateCatalog(delegate) } else throw new IllegalArgumentException("Invalid session catalog: " + delegate) } diff --git a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala index 034352af9..ac69fffc6 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala @@ -7,7 +7,7 @@ import org.apache.spark.sql.AnalysisException class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with CatalogTestSuite { "QbeastCatalog" should - "coexist with Delta Catalog" in withTmpDir(tmpDir => + "coexist with Delta tables" in withTmpDir(tmpDir => withExtendedSpark(sparkConf = new SparkConf() .setMaster("local[8]") .set("spark.sql.extensions", "io.qbeast.spark.internal.QbeastSparkSessionExtension")