From 386d4a1ca7cd3ebee40af97d233e6768683005c2 Mon Sep 17 00:00:00 2001 From: Jiawei Date: Wed, 17 Aug 2022 19:14:35 +0200 Subject: [PATCH 1/7] Allow Append operations to omit columnsToIndex --- .../qbeast/spark/internal/sources/QbeastDataSource.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala b/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala index c45c35dcf..00a39204a 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala @@ -65,9 +65,11 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = { - require( - parameters.contains("columnsToIndex"), - throw AnalysisExceptionFactory.create("'columnsToIndex is not specified")) + if (mode != SaveMode.Append) { + require( + parameters.contains("columnsToIndex"), + throw AnalysisExceptionFactory.create("'columnsToIndex' is not specified")) + } val tableId = SparkToQTypesUtils.loadFromParameters(parameters) val table = tableFactory.getIndexedTable(tableId) mode match { From c81397d970ffd929641e95e10bc0e859570bf3d7 Mon Sep 17 00:00:00 2001 From: Jiawei Date: Wed, 17 Aug 2022 19:16:19 +0200 Subject: [PATCH 2/7] Use existing indexing params if not provided by user when appending data --- .../io/qbeast/spark/table/IndexedTable.scala | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala index 77446a2d5..ecd2d3ede 100644 --- a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala +++ b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala @@ -8,6 +8,7 @@ import io.qbeast.core.model._ import io.qbeast.spark.delta.CubeDataLoader import io.qbeast.spark.index.QbeastColumns import io.qbeast.spark.internal.QbeastOptions +import io.qbeast.spark.internal.QbeastOptions.{COLUMNS_TO_INDEX, CUBE_SIZE} import io.qbeast.spark.internal.sources.QbeastBaseRelation import org.apache.spark.qbeast.config.DEFAULT_NUMBER_OF_RETRIES import org.apache.spark.sql.delta.actions.FileAction @@ -143,6 +144,26 @@ private[table] class IndexedTableImpl( } + /** + * Add the required indexing parameters when the SaveMode is Append. + * The user-provided parameters are respected. + * @param latestRevision the latest revision + * @param params the parameters required for indexing + */ + private def addRequiredParams( + latestRevision: Revision, + parameters: Map[String, String]): Map[String, String] = { + val columnsToIndex = latestRevision.columnTransformers.map(_.columnName).mkString(",") + val desiredCubeSize = latestRevision.desiredCubeSize.toString + (parameters.contains(COLUMNS_TO_INDEX), parameters.contains(CUBE_SIZE)) match { + case (true, true) => parameters + case (false, false) => + parameters + (COLUMNS_TO_INDEX -> columnsToIndex, CUBE_SIZE -> desiredCubeSize) + case (true, false) => parameters + (CUBE_SIZE -> desiredCubeSize) + case (false, true) => parameters + (COLUMNS_TO_INDEX -> columnsToIndex) + } + } + override def save( data: DataFrame, parameters: Map[String, String], @@ -150,12 +171,13 @@ private[table] class IndexedTableImpl( val indexStatus = if (exists && append) { val latestIndexStatus = snapshot.loadLatestIndexStatus - if (checkRevisionParameters(QbeastOptions(parameters), latestIndexStatus)) { + val updatedParameters = addRequiredParams(latestIndexStatus.revision, parameters) + if (checkRevisionParameters(QbeastOptions(updatedParameters), latestIndexStatus)) { latestIndexStatus } else { val oldRevisionID = latestIndexStatus.revision.revisionID val newRevision = revisionBuilder - .createNextRevision(tableID, data.schema, parameters, oldRevisionID) + .createNextRevision(tableID, data.schema, updatedParameters, oldRevisionID) IndexStatus(newRevision) } } else { From 2c1c057dbe4506ef702d92b6da9a5aede5156ab8 Mon Sep 17 00:00:00 2001 From: Jiawei Date: Wed, 17 Aug 2022 19:16:58 +0200 Subject: [PATCH 3/7] Test append operations without specifying cubeSize and/or columnsToIndex --- .../QbeastDataSourceIntegrationTest.scala | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastDataSourceIntegrationTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastDataSourceIntegrationTest.scala index 9e92ab726..22d95540a 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastDataSourceIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastDataSourceIntegrationTest.scala @@ -168,4 +168,88 @@ class QbeastDataSourceIntegrationTest extends QbeastIntegrationTestSpec { } } + "Appending to an existing qbeast table" should + "work without specifying cubeSize or columnsToIndex" in withQbeastContextSparkAndTmpDir { + (spark, tmpDir) => + { + val original = loadTestData(spark) + original.write + .format("qbeast") + .option("cubeSize", 10000) + .option("columnsToIndex", "user_id,product_id") + .save(tmpDir) + + original.write + .mode("append") + .format("qbeast") + .save(tmpDir) + val qDf = spark.read.format("qbeast").load(tmpDir) + + qDf.count shouldBe original.count * 2 + } + } + + it should "work without specifying columnsToIndex" in + withQbeastContextSparkAndTmpDir { (spark, tmpDir) => + { + val original = loadTestData(spark) + original.write + .format("qbeast") + .option("cubeSize", 10000) + .option("columnsToIndex", "user_id,product_id") + .save(tmpDir) + + original.write + .mode("append") + .format("qbeast") + .option("cubeSize", 10000) + .save(tmpDir) + val qDf = spark.read.format("qbeast").load(tmpDir) + + qDf.count shouldBe original.count * 2 + } + } + + it should "work without specifying columnsToIndex" + + "while cause revision change by using a different cubeSize" in + withQbeastContextSparkAndTmpDir { (spark, tmpDir) => + { + val original = loadTestData(spark) + original.write + .format("qbeast") + .option("cubeSize", 10000) + .option("columnsToIndex", "user_id,product_id") + .save(tmpDir) + + original.write + .mode("append") + .format("qbeast") + .option("cubeSize", 5000) + .save(tmpDir) + val qDf = spark.read.format("qbeast").load(tmpDir) + + qDf.count shouldBe original.count * 2 + } + } + + it should "append to an existing qbeast table without specifying cubeSize" in + withQbeastContextSparkAndTmpDir { (spark, tmpDir) => + { + val original = loadTestData(spark) + original.write + .format("qbeast") + .option("cubeSize", 10000) + .option("columnsToIndex", "user_id,product_id") + .save(tmpDir) + + original.write + .mode("append") + .format("qbeast") + .option("columnsToIndex", "user_id,product_id") + .save(tmpDir) + val qDf = spark.read.format("qbeast").load(tmpDir) + + qDf.count shouldBe original.count * 2 + } + } } From 7126124a8752f5c1233273b49a6c2c5002bd0d82 Mon Sep 17 00:00:00 2001 From: Jiawei Date: Thu, 18 Aug 2022 10:21:38 +0200 Subject: [PATCH 4/7] Add missing space --- .../io/qbeast/spark/utils/QbeastDataSourceIntegrationTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastDataSourceIntegrationTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastDataSourceIntegrationTest.scala index 22d95540a..73e052441 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastDataSourceIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastDataSourceIntegrationTest.scala @@ -210,7 +210,7 @@ class QbeastDataSourceIntegrationTest extends QbeastIntegrationTestSpec { } } - it should "work without specifying columnsToIndex" + + it should "work without specifying columnsToIndex " + "while cause revision change by using a different cubeSize" in withQbeastContextSparkAndTmpDir { (spark, tmpDir) => { From e456dfc6c4df34f1241e8ae6b0a0f8acc0967b0d Mon Sep 17 00:00:00 2001 From: Jiawei Date: Thu, 18 Aug 2022 10:22:33 +0200 Subject: [PATCH 5/7] Append should not throw exception when columnsToIndex is not provided, change test to use SaveMode.Overwrite --- .../io/qbeast/spark/internal/sources/QbeastDataSourceTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/io/qbeast/spark/internal/sources/QbeastDataSourceTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/QbeastDataSourceTest.scala index 945efdfff..5cac6903e 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/QbeastDataSourceTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/QbeastDataSourceTest.scala @@ -157,7 +157,7 @@ class QbeastDataSourceTest extends FixtureAnyFlatSpec with MockitoSugar with Mat val parameters = Map("path" -> path) val data = mock[DataFrame] a[AnalysisException] shouldBe thrownBy { - f.dataSource.createRelation(f.sqlContext, SaveMode.Append, parameters, data) + f.dataSource.createRelation(f.sqlContext, SaveMode.Overwrite, parameters, data) } } From da534d3106ef1fe24117c49a461605a3acb7c930 Mon Sep 17 00:00:00 2001 From: Jiawei Date: Thu, 18 Aug 2022 11:03:25 +0200 Subject: [PATCH 6/7] Simplify indexing parameter requirement --- .../spark/internal/sources/QbeastDataSource.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala b/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala index 00a39204a..3559d2d79 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala @@ -65,11 +65,11 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = { - if (mode != SaveMode.Append) { - require( - parameters.contains("columnsToIndex"), - throw AnalysisExceptionFactory.create("'columnsToIndex' is not specified")) - } + + require( + parameters.contains("columnsToIndex") || mode == SaveMode.Append, + throw AnalysisExceptionFactory.create("'columnsToIndex' is not specified")) + val tableId = SparkToQTypesUtils.loadFromParameters(parameters) val table = tableFactory.getIndexedTable(tableId) mode match { From d8cadcc90a0b0934e677757b53a92387d9b9db36 Mon Sep 17 00:00:00 2001 From: Jiawei Date: Thu, 18 Aug 2022 11:18:24 +0200 Subject: [PATCH 7/7] 'columnsToIndex' should be provided when SaveMode.Append is used for the first write --- .../utils/QbeastDataSourceIntegrationTest.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastDataSourceIntegrationTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastDataSourceIntegrationTest.scala index 73e052441..40d3756db 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastDataSourceIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastDataSourceIntegrationTest.scala @@ -5,6 +5,7 @@ package io.qbeast.spark.utils import io.qbeast.spark.QbeastIntegrationTestSpec import io.qbeast.spark.delta.DeltaQbeastSnapshot +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.functions._ @@ -252,4 +253,19 @@ class QbeastDataSourceIntegrationTest extends QbeastIntegrationTestSpec { qDf.count shouldBe original.count * 2 } } + + "Appending to an non-existing table" should + "throw an exception if 'columnsToIndex' is not provided" in withQbeastContextSparkAndTmpDir { + (spark, tmpDir) => + { + val original = loadTestData(spark) + a[AnalysisException] shouldBe thrownBy { + original.write + .format("qbeast") + .option("cubeSize", 10000) + .save(tmpDir) + } + } + } + }