Skip to content

Commit

Permalink
Merge pull request #125 from Jiaweihu08/81-infer-params-when-appending
Browse files Browse the repository at this point in the history
Infer parameters for Appends
  • Loading branch information
Jiaweihu08 authored Sep 5, 2022
2 parents 5d86c47 + d8cadcc commit 90b1d3b
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {

require(
parameters.contains("columnsToIndex"),
throw AnalysisExceptionFactory.create("'columnsToIndex is not specified"))
parameters.contains("columnsToIndex") || mode == SaveMode.Append,
throw AnalysisExceptionFactory.create("'columnsToIndex' is not specified"))

val tableId = SparkToQTypesUtils.loadFromParameters(parameters)
val table = tableFactory.getIndexedTable(tableId)
mode match {
Expand Down
26 changes: 24 additions & 2 deletions src/main/scala/io/qbeast/spark/table/IndexedTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.qbeast.core.model._
import io.qbeast.spark.delta.CubeDataLoader
import io.qbeast.spark.index.QbeastColumns
import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.spark.internal.QbeastOptions.{COLUMNS_TO_INDEX, CUBE_SIZE}
import io.qbeast.spark.internal.sources.QbeastBaseRelation
import org.apache.spark.qbeast.config.DEFAULT_NUMBER_OF_RETRIES
import org.apache.spark.sql.delta.actions.FileAction
Expand Down Expand Up @@ -148,19 +149,40 @@ private[table] class IndexedTableImpl(

}

/**
* Add the required indexing parameters when the SaveMode is Append.
* The user-provided parameters are respected.
* @param latestRevision the latest revision
* @param params the parameters required for indexing
*/
private def addRequiredParams(
latestRevision: Revision,
parameters: Map[String, String]): Map[String, String] = {
val columnsToIndex = latestRevision.columnTransformers.map(_.columnName).mkString(",")
val desiredCubeSize = latestRevision.desiredCubeSize.toString
(parameters.contains(COLUMNS_TO_INDEX), parameters.contains(CUBE_SIZE)) match {
case (true, true) => parameters
case (false, false) =>
parameters + (COLUMNS_TO_INDEX -> columnsToIndex, CUBE_SIZE -> desiredCubeSize)
case (true, false) => parameters + (CUBE_SIZE -> desiredCubeSize)
case (false, true) => parameters + (COLUMNS_TO_INDEX -> columnsToIndex)
}
}

override def save(
data: DataFrame,
parameters: Map[String, String],
append: Boolean): BaseRelation = {
val indexStatus =
if (exists && append) {
val latestIndexStatus = snapshot.loadLatestIndexStatus
if (checkRevisionParameters(QbeastOptions(parameters), latestIndexStatus)) {
val updatedParameters = addRequiredParams(latestIndexStatus.revision, parameters)
if (checkRevisionParameters(QbeastOptions(updatedParameters), latestIndexStatus)) {
latestIndexStatus
} else {
val oldRevisionID = latestIndexStatus.revision.revisionID
val newRevision = revisionBuilder
.createNextRevision(tableID, data.schema, parameters, oldRevisionID)
.createNextRevision(tableID, data.schema, updatedParameters, oldRevisionID)
IndexStatus(newRevision)
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class QbeastDataSourceTest extends FixtureAnyFlatSpec with MockitoSugar with Mat
val parameters = Map("path" -> path)
val data = mock[DataFrame]
a[AnalysisException] shouldBe thrownBy {
f.dataSource.createRelation(f.sqlContext, SaveMode.Append, parameters, data)
f.dataSource.createRelation(f.sqlContext, SaveMode.Overwrite, parameters, data)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package io.qbeast.spark.utils

import io.qbeast.spark.QbeastIntegrationTestSpec
import io.qbeast.spark.delta.DeltaQbeastSnapshot
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.functions._

Expand Down Expand Up @@ -168,4 +169,103 @@ class QbeastDataSourceIntegrationTest extends QbeastIntegrationTestSpec {
}
}

"Appending to an existing qbeast table" should
"work without specifying cubeSize or columnsToIndex" in withQbeastContextSparkAndTmpDir {
(spark, tmpDir) =>
{
val original = loadTestData(spark)
original.write
.format("qbeast")
.option("cubeSize", 10000)
.option("columnsToIndex", "user_id,product_id")
.save(tmpDir)

original.write
.mode("append")
.format("qbeast")
.save(tmpDir)
val qDf = spark.read.format("qbeast").load(tmpDir)

qDf.count shouldBe original.count * 2
}
}

it should "work without specifying columnsToIndex" in
withQbeastContextSparkAndTmpDir { (spark, tmpDir) =>
{
val original = loadTestData(spark)
original.write
.format("qbeast")
.option("cubeSize", 10000)
.option("columnsToIndex", "user_id,product_id")
.save(tmpDir)

original.write
.mode("append")
.format("qbeast")
.option("cubeSize", 10000)
.save(tmpDir)
val qDf = spark.read.format("qbeast").load(tmpDir)

qDf.count shouldBe original.count * 2
}
}

it should "work without specifying columnsToIndex " +
"while cause revision change by using a different cubeSize" in
withQbeastContextSparkAndTmpDir { (spark, tmpDir) =>
{
val original = loadTestData(spark)
original.write
.format("qbeast")
.option("cubeSize", 10000)
.option("columnsToIndex", "user_id,product_id")
.save(tmpDir)

original.write
.mode("append")
.format("qbeast")
.option("cubeSize", 5000)
.save(tmpDir)
val qDf = spark.read.format("qbeast").load(tmpDir)

qDf.count shouldBe original.count * 2
}
}

it should "append to an existing qbeast table without specifying cubeSize" in
withQbeastContextSparkAndTmpDir { (spark, tmpDir) =>
{
val original = loadTestData(spark)
original.write
.format("qbeast")
.option("cubeSize", 10000)
.option("columnsToIndex", "user_id,product_id")
.save(tmpDir)

original.write
.mode("append")
.format("qbeast")
.option("columnsToIndex", "user_id,product_id")
.save(tmpDir)
val qDf = spark.read.format("qbeast").load(tmpDir)

qDf.count shouldBe original.count * 2
}
}

"Appending to an non-existing table" should
"throw an exception if 'columnsToIndex' is not provided" in withQbeastContextSparkAndTmpDir {
(spark, tmpDir) =>
{
val original = loadTestData(spark)
a[AnalysisException] shouldBe thrownBy {
original.write
.format("qbeast")
.option("cubeSize", 10000)
.save(tmpDir)
}
}
}

}

0 comments on commit 90b1d3b

Please sign in to comment.