From 9bd33053d78eecd87e679382c12906d1a89549ae Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Mon, 12 Dec 2022 13:43:36 +0100 Subject: [PATCH 1/4] Add DeltaCatalog in QbeastCatalog to delegate delta format writed --- .../sources/catalog/QbeastCatalog.scala | 38 ++++++++++++++----- .../QbeastCatalogIntegrationTest.scala | 30 ++++++++++++++- 2 files changed, 56 insertions(+), 12 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala index d2ddf4566..4837bbcc6 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala @@ -16,6 +16,7 @@ import org.apache.spark.sql.catalyst.analysis.{ import org.apache.spark.sql.{SparkCatalogUtils, SparkSession} import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.delta.catalog.DeltaCatalog import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -37,21 +38,29 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces] private val tableFactory = QbeastContext.indexedTableFactory + private val deltaCatalog: DeltaCatalog = new DeltaCatalog() + private var delegatedCatalog: CatalogPlugin = null private var catalogName: String = null - private def getSessionCatalog(): T = { + private def getDelegatedCatalog(): T = { val sessionCatalog = delegatedCatalog match { case null => // In this case, any catalog has been delegated, so we need to search for the default SparkCatalogUtils.getV2SessionCatalog(SparkSession.active) case o => o } - sessionCatalog.asInstanceOf[T] } + private def getSessionCatalog(properties: Map[String, String] = Map.empty): T = { + properties.get("provider") match { + case Some("delta") => deltaCatalog.asInstanceOf[T] + case _ => getDelegatedCatalog() + } + } + override def loadTable(ident: Identifier): Table = { try { getSessionCatalog().loadTable(ident) match { @@ -93,7 +102,11 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces] // Load the table loadTable(ident) } else { - getSessionCatalog().createTable(ident, schema, partitions, properties) + getSessionCatalog(properties.asScala.toMap).createTable( + ident, + schema, + partitions, + properties) } } @@ -119,12 +132,13 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces] properties, tableFactory) } else { - if (getSessionCatalog().tableExists(ident)) { - getSessionCatalog().dropTable(ident) + val sessionCatalog = getSessionCatalog(properties.asScala.toMap) + if (sessionCatalog.tableExists(ident)) { + sessionCatalog.dropTable(ident) } DefaultStagedTable( ident, - getSessionCatalog().createTable(ident, schema, partitions, properties), + sessionCatalog.createTable(ident, schema, partitions, properties), this) } } @@ -143,12 +157,13 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces] properties, tableFactory) } else { - if (getSessionCatalog().tableExists(ident)) { - getSessionCatalog().dropTable(ident) + val sessionCatalog = getSessionCatalog(properties.asScala.toMap) + if (sessionCatalog.tableExists(ident)) { + sessionCatalog.dropTable(ident) } DefaultStagedTable( ident, - getSessionCatalog().createTable(ident, schema, partitions, properties), + sessionCatalog.createTable(ident, schema, partitions, properties), this) } @@ -170,7 +185,8 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces] } else { DefaultStagedTable( ident, - getSessionCatalog().createTable(ident, schema, partitions, properties), + getSessionCatalog(properties.asScala.toMap) + .createTable(ident, schema, partitions, properties), this) } } @@ -208,6 +224,7 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces] override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { // Initialize the catalog with the corresponding name this.catalogName = name + this.deltaCatalog.initialize(name, options) } override def name(): String = catalogName @@ -216,6 +233,7 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces] // Check if the delegating catalog has Table and SupportsNamespace properties if (delegate.isInstanceOf[TableCatalog] && delegate.isInstanceOf[SupportsNamespaces]) { this.delegatedCatalog = delegate + this.deltaCatalog.setDelegateCatalog(delegate) } else throw new IllegalArgumentException("Invalid session catalog: " + delegate) } diff --git a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala index a62a21dca..034352af9 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala @@ -7,7 +7,7 @@ import org.apache.spark.sql.AnalysisException class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with CatalogTestSuite { "QbeastCatalog" should - "coexist with Delta tables" in withTmpDir(tmpDir => + "coexist with Delta Catalog" in withTmpDir(tmpDir => withExtendedSpark(sparkConf = new SparkConf() .setMaster("local[8]") .set("spark.sql.extensions", "io.qbeast.spark.internal.QbeastSparkSessionExtension") @@ -21,7 +21,6 @@ class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with Catalo data.write.format("delta").saveAsTable("delta_table") // delta catalog - // spark.sql("USE CATALOG qbeast_catalog") data.write .format("qbeast") .option("columnsToIndex", "id") @@ -41,6 +40,33 @@ class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with Catalo })) + it should + "coexist with Delta tables in the same catalog" in withQbeastContextSparkAndTmpWarehouse( + (spark, _) => { + + val data = createTestData(spark) + + data.write.format("delta").saveAsTable("delta_table") // delta catalog + + data.write + .format("qbeast") + .option("columnsToIndex", "id") + .saveAsTable("qbeast_table") // qbeast catalog + + val tables = spark.sessionState.catalog.listTables("default") + tables.size shouldBe 2 + + val deltaTable = spark.read.table("delta_table") + val qbeastTable = spark.read.table("qbeast_table") + + assertSmallDatasetEquality( + deltaTable, + qbeastTable, + orderedComparison = false, + ignoreNullable = true) + + }) + it should "crate table" in withQbeastContextSparkAndTmpWarehouse((spark, _) => { spark.sql( From 83e3e9a4bebf7411fd4cb276be9cf3e5834cd756 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Mon, 12 Dec 2022 15:23:02 +0100 Subject: [PATCH 2/4] Updating Docs version and adding information about Catalogs --- README.md | 2 +- docs/AdvancedConfiguration.md | 50 ++++++++++++++++++++++++++++++ docs/Quickstart.md | 58 +++++++++++++++++++++++++---------- 3 files changed, 93 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 545ddf3c8..888504cd8 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ export SPARK_HOME=$PWD/spark-3.1.1-bin-hadoop3.2 $SPARK_HOME/bin/spark-shell \ --conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \ --conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \ ---packages io.qbeast:qbeast-spark_2.12:0.2.0,io.delta:delta-core_2.12:1.0.0 +--packages io.qbeast:qbeast-spark_2.12:0.3.0,io.delta:delta-core_2.12:1.0.0 ``` ### 2. Indexing a dataset diff --git a/docs/AdvancedConfiguration.md b/docs/AdvancedConfiguration.md index 0a1e3d1e2..1ca38bec7 100644 --- a/docs/AdvancedConfiguration.md +++ b/docs/AdvancedConfiguration.md @@ -2,6 +2,56 @@ There's different configurations for the index that can affect the performance on read or the writing process. Here is a resume of some of them. +## Catalogs + +We designed the `QbeastCatalog` to work as an **entry point for other format's Catalog's** as well. + +However, you can also handle different Catalogs simultanously. + +### 1. Unified Catalog + +```bash +--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog +``` + +Using the `spark_catalog` configuration, you can write **qbeast** and **delta** ( or upcoming formats ;) ) tables into the `default` namespace. + +```scala +df.write + .format("qbeast") + .option("columnsToIndex", "user_id,product_id") + .saveAsTable("qbeast_table") + +df.write + .format("delta") + .saveAsTable("delta_table") +``` +### 2. Secondary catalog + +For using **more than one Catalog in the same session**, you can set it up in a different space. + +```bash +--conf spark.sql.catalog.spark_catalog = org.apache.spark.sql.delta.catalog.DeltaCatalog \ +--conf spark.sql.catalog.qbeast_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog +``` + +Notice the `QbeastCatalog` conf parameter is not anymore `spark_catalog`, but has a customized name like `qbeast_catalog`. Each table written using the **qbeast** implementation, should have the prefix `qbeast_catalog`. + +For example: + +```scala +// DataFrame API +df.write + .format("qbeast") + .option("columnsToIndex", "user_id,product_id") + .saveAsTable("qbeast_catalog.default.qbeast_table") + +// SQL +spark.sql("CREATE TABLE qbeast_catalog.default.qbeast_table USING qbeast AS SELECT * FROM ecommerce") +``` + + + ## ColumnsToIndex These are the columns you want to index. Try to find those which are interesting for your queries, or your data pipelines. diff --git a/docs/Quickstart.md b/docs/Quickstart.md index 29c39bf6e..73b3ea92a 100644 --- a/docs/Quickstart.md +++ b/docs/Quickstart.md @@ -16,13 +16,8 @@ Inside the project folder, launch a spark-shell with the required **dependencies ```bash $SPARK_HOME/bin/spark-shell \ --conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \ ---conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider \ ---packages io.qbeast:qbeast-spark_2.12:0.2.0,\ -io.delta:delta-core_2.12:1.0.0,\ -com.amazonaws:aws-java-sdk:1.12.20,\ -org.apache.hadoop:hadoop-common:3.2.0,\ -org.apache.hadoop:hadoop-client:3.2.0,\ -org.apache.hadoop:hadoop-aws:3.2.0 +--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \ +--packages io.qbeast:qbeast-spark_2.12:0.3.0,io.delta:delta-core_2.12:1.2.0 ``` As an **_extra configuration_**, you can also change two global parameters of the index: @@ -37,26 +32,28 @@ As an **_extra configuration_**, you can also change two global parameters of th ``` Consult the [Qbeast-Spark advanced configuration](AdvancedConfiguration.md) for more information. -Read the ***store_sales*** public dataset from `TPC-DS`, the table has with **23** columns in total and was generated with a `scaleFactor` of 1. Check [The Making of TPC-DS](http://www.tpc.org/tpcds/presentations/the_making_of_tpcds.pdf) for more details on the dataset. - +Read the ***ecommerce*** test dataset from [Kaggle](https://www.kaggle.com/code/adilemrebilgic/e-commerce-analytics/data). ```scala -val parquetTablePath = "s3a://qbeast-public-datasets/store_sales" - -val parquetDf = spark.read.format("parquet").load(parquetTablePath).na.drop() +val ecommerce = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "true") + .load("src/test/resources/ecommerce100K_2019_Oct.csv") ``` -Indexing the data with the desired columns, in this case `ss_cdemo_sk` and `ss_cdemo_sk`. +Indexing the data with the desired columns, in this case `user_id` and `product_id`. ```scala val qbeastTablePath = "/tmp/qbeast-test-data/qtable" -(parquetDf.write +(ecommerce.write .mode("overwrite") .format("qbeast") // Saving the dataframe in a qbeast datasource - .option("columnsToIndex", "ss_cdemo_sk,ss_cdemo_sk") // Indexing the table - .option("cubeSize", 300000) // The desired number of records of the resulting files/cubes. Default is 100000 + .option("columnsToIndex", "user_id,product_id") // Indexing the table + .option("cubeSize", "500") // The desired number of records of the resulting files/cubes. Default is 5M .save(qbeastTablePath)) ``` + ## Sampling Allow the sample operator to be pushed down to the source when sampling, reducing i/o and computational cost. @@ -80,6 +77,35 @@ qbeastDf.sample(0.1).explain() Notice that the sample operator is no longer present in the physical plan. It's converted into a `Filter (qbeast_hash)` instead and is used to select files during data scanning(`DataFilters` from `FileScan`). We skip reading many files in this way, involving less I/O. +## SQL + +Thanks to the `QbeastCatalog`, you can use plain SQL and `CREATE TABLE` or `INSERT INTO` in qbeast format. + +To check the different configuration on the Catalog, please go to [Advanced Configuration](AdvancedConfiguration.md) + +```scala +ecommerce.createOrReplaceTmpView("ecommerce_october") + +spark.sql("CREATE OR REPLACE TABLE ecommerce_qbeast USING qbeast AS SELECT * FROM ecommerce_october") + +//OR + +val ecommerceNovember = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "true") + .load("./src/test/resources/ecommerce100K_2019_Nov.csv") + +ecommerceNovember.createOrReplaceTmpView("ecommerce_november") + +spark.sql("INSERT INTO ecommerce_qbeast SELECT * FROM ecommerce_november") +``` +Sampling has also an operator called `TABLESAMPLE`, which can be expressed in both terms of rows or percentage. + +```scala +spark.sql("SELECT avg(price) FROM ecommerce_qbeast TABLESAMPLE(10 PERCENT)").show() +``` + ## Analyze and Optimize From c0e9f5fbd626a65a4811d77fd488a37e828e625e Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Mon, 12 Dec 2022 16:13:14 +0100 Subject: [PATCH 3/4] Add comments and rollback test name --- .../sources/catalog/QbeastCatalog.scala | 17 +++++++++++++++++ .../catalog/QbeastCatalogIntegrationTest.scala | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala index 4837bbcc6..9cb75c7ae 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala @@ -44,6 +44,10 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces] private var catalogName: String = null + /** + * Gets the delegated catalog of the session + * @return + */ private def getDelegatedCatalog(): T = { val sessionCatalog = delegatedCatalog match { case null => @@ -54,6 +58,17 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces] sessionCatalog.asInstanceOf[T] } + /** + * Gets the session catalog depending on provider properties, if any + * + * The intention is to include the different catalog providers + * while we add the integrations with the formats. + * For example, for "delta" provider it will return a DeltaCatalog instance. + * + * In this way, users may only need to instantiate one single unified catalog. + * @param properties the properties with the provider parameter + * @return + */ private def getSessionCatalog(properties: Map[String, String] = Map.empty): T = { properties.get("provider") match { case Some("delta") => deltaCatalog.asInstanceOf[T] @@ -224,6 +239,7 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces] override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { // Initialize the catalog with the corresponding name this.catalogName = name + // Initialize the catalog in any other provider that we can integrate with this.deltaCatalog.initialize(name, options) } @@ -233,6 +249,7 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces] // Check if the delegating catalog has Table and SupportsNamespace properties if (delegate.isInstanceOf[TableCatalog] && delegate.isInstanceOf[SupportsNamespaces]) { this.delegatedCatalog = delegate + // Set delegated catalog in any other provider that we can integrate with this.deltaCatalog.setDelegateCatalog(delegate) } else throw new IllegalArgumentException("Invalid session catalog: " + delegate) } diff --git a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala index 034352af9..ac69fffc6 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala @@ -7,7 +7,7 @@ import org.apache.spark.sql.AnalysisException class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with CatalogTestSuite { "QbeastCatalog" should - "coexist with Delta Catalog" in withTmpDir(tmpDir => + "coexist with Delta tables" in withTmpDir(tmpDir => withExtendedSpark(sparkConf = new SparkConf() .setMaster("local[8]") .set("spark.sql.extensions", "io.qbeast.spark.internal.QbeastSparkSessionExtension") From 8d3354c93c1c92506c03d1290451d1534bf4526b Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Wed, 14 Dec 2022 10:13:55 +0100 Subject: [PATCH 4/4] Change version and docs --- CONTRIBUTING.md | 2 +- README.md | 13 +++++++------ build.sbt | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 73d7e8883..e6d1c5ec7 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -125,7 +125,7 @@ For example: sbt assembly $SPARK_HOME/bin/spark-shell \ ---jars ./target/scala-2.12/qbeast-spark-assembly-0.3.0-alpha.jar \ +--jars ./target/scala-2.12/qbeast-spark-assembly-0.3.0.jar \ --conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \ --packages io.delta:delta-core_2.12:1.2.0 ``` diff --git a/README.md b/README.md index 888504cd8..d728dae6d 100644 --- a/README.md +++ b/README.md @@ -91,9 +91,10 @@ export SPARK_HOME=$PWD/spark-3.1.1-bin-hadoop3.2 ```bash $SPARK_HOME/bin/spark-shell \ +--repositories https://s01.oss.sonatype.org/content/repositories/snapshots \ --conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \ --conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \ ---packages io.qbeast:qbeast-spark_2.12:0.3.0,io.delta:delta-core_2.12:1.0.0 +--packages io.qbeast:qbeast-spark_2.12:0.3.0-SNAPSHOT,io.delta:delta-core_2.12:1.2.0 ``` ### 2. Indexing a dataset @@ -173,11 +174,11 @@ qbeastTable.analyze() Go to [QbeastTable documentation](./docs/QbeastTable.md) for more detailed information. # Dependencies and Version Compatibility -| Version | Spark | Hadoop | Delta Lake | -|-------------|:-----:|:------:|:----------:| -| 0.1.0 | 3.0.0 | 3.2.0 | 0.8.0 | -| 0.2.0 | 3.1.x | 3.2.0 | 1.0.0 | -| 0.3.0-alpha | 3.2.x | 3.3.x | 1.2.x | +| Version | Spark | Hadoop | Delta Lake | +|------------|:-----:|:------:|:----------:| +| 0.1.0 | 3.0.0 | 3.2.0 | 0.8.0 | +| 0.2.0 | 3.1.x | 3.2.0 | 1.0.0 | +| 0.3.0 | 3.2.x | 3.3.x | 1.2.x | Check [here](https://docs.delta.io/latest/releases.html) for **Delta Lake** and **Apache Spark** version compatibility. diff --git a/build.sbt b/build.sbt index 86ea77089..656c4eda7 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ import Dependencies._ import xerial.sbt.Sonatype._ -val mainVersion = "0.3.0-alpha" +val mainVersion = "0.3.0" lazy val qbeastCore = (project in file("core")) .settings(