Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert to Qbeast #152

Merged
merged 79 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
8a61514
First naive implementation of convert to qbeast
osopardo1 May 10, 2022
99a823b
Add headers
osopardo1 May 11, 2022
f8bd3d0
Merge branch 'main' into convert-to-qbeast
osopardo1 Sep 8, 2022
feb0754
Change RunnableCommand to LeafRunnableCommand
osopardo1 Sep 8, 2022
5d4e0bc
Access deltaLog/snapshot and modify log
Jiaweihu08 Sep 9, 2022
51c5bb6
Add transformers to revision
Jiaweihu08 Sep 12, 2022
9a2236b
Converted tables should be readable using both formates
Jiaweihu08 Sep 12, 2022
76219b3
Add AddFiles with qbeast metadata using append write mode
Jiaweihu08 Sep 20, 2022
d3d472d
Check input format
Jiaweihu08 Sep 20, 2022
8f5ffe2
Simplify test
Jiaweihu08 Sep 20, 2022
ac90d08
Refactor code
Jiaweihu08 Sep 20, 2022
b4d1466
Add method for parquet to delta conversion
Jiaweihu08 Sep 20, 2022
471cb06
Reformat tests
Jiaweihu08 Sep 20, 2022
b22afc0
Separate metadata tag computation
Jiaweihu08 Sep 20, 2022
1de47a8
Extract record count from parquet file metadata
Jiaweihu08 Sep 21, 2022
d435fc7
Add test for parquet to qbeast conversion, test index metrics resulte…
Jiaweihu08 Sep 21, 2022
0d26406
Test String data type
Jiaweihu08 Sep 21, 2022
6f92dcc
Make command idempotent
Jiaweihu08 Sep 21, 2022
0208678
Test command idempotence
Jiaweihu08 Sep 21, 2022
909bc11
Test converting a partitioned delta table
Jiaweihu08 Sep 21, 2022
c30c6e8
Use global path parameter
Jiaweihu08 Sep 21, 2022
304c991
Consider cases where the provided fileFormat doesn't match with real …
Jiaweihu08 Sep 22, 2022
72e40ae
Add format inferrence, conversion for partitioned parquet files, infe…
Jiaweihu08 Sep 22, 2022
f009a54
Add comment
Jiaweihu08 Sep 22, 2022
daf4454
Test extraction of numRecords when AddFile stats is corrupted
Jiaweihu08 Sep 22, 2022
c8efff5
Remove unnecessary assertions
Jiaweihu08 Sep 22, 2022
389401d
Spark data type name extraction for only supported partition data types
Jiaweihu08 Sep 23, 2022
1a4e5d5
Test supported partition data types
Jiaweihu08 Sep 23, 2022
f5858f9
Secure return type
Jiaweihu08 Sep 23, 2022
02e9100
Use wider default column min max to avoid out-of-scope points
Jiaweihu08 Sep 23, 2022
c1c16d5
Add tests for Analyze, and Compaction on a converted qbeast table
Jiaweihu08 Sep 23, 2022
25a3956
Use table path from tablID
Jiaweihu08 Sep 23, 2022
56baac9
Add test for optimization on a converted table
Jiaweihu08 Sep 23, 2022
6171063
Remove command
Jiaweihu08 Sep 28, 2022
486711f
Add staging files during read
Jiaweihu08 Jan 16, 2023
116ae77
WIP, Qbeast format compatibility with delta lake
Jiaweihu08 Jan 18, 2023
e48732a
WIP, Use all columns for staging sampling
Jiaweihu08 Jan 19, 2023
3338c90
Merge branch 'main' into convert-to-qbeast
Jiaweihu08 Jan 20, 2023
fbb84c0
Rely on user input for source format, direct metadata update
Jiaweihu08 Jan 20, 2023
1da9bff
Add comment
Jiaweihu08 Jan 23, 2023
e518465
Merge to main
Jiaweihu08 Jan 23, 2023
6e00d91
Use the indexing columns from the latest revision for online sampling
Jiaweihu08 Jan 23, 2023
df27cd3
Use EmptyTransformation/ers for the conversion revision
Jiaweihu08 Jan 23, 2023
a4c8919
Test Conversion command. Analyze command should not change ANNOUNCED …
Jiaweihu08 Jan 23, 2023
9a36b1c
Proper merge method for EmptyTransformation
Jiaweihu08 Jan 23, 2023
951b68e
Is a qbeast table if is converted or written using qbeast format
Jiaweihu08 Jan 23, 2023
9f4df90
Use conversion revisionID(0) as staging revisionID
Jiaweihu08 Jan 23, 2023
d3b1cfd
ConvertToQbeast can add conversion/staging revision to a qbeat table
Jiaweihu08 Jan 23, 2023
770b49f
Discard staging revision via the creation of EmptySpace
Jiaweihu08 Jan 24, 2023
9d7bb4f
Separate staging cube statuses computation
Jiaweihu08 Jan 24, 2023
371b060
Add staging revision during first writes
Jiaweihu08 Jan 24, 2023
a2218ff
Add staging revision during first write, idempotent conversion
Jiaweihu08 Jan 24, 2023
08c4aaf
Remove staging revision from loadRevisionAt
Jiaweihu08 Jan 24, 2023
1ff0dd2
Allow the creation of empty cubes for the staging revision
Jiaweihu08 Jan 24, 2023
08395d1
Compaction with no dataChange
Jiaweihu08 Jan 24, 2023
b009c37
Include staging revision and use initial revisionID = 1 for test
Jiaweihu08 Jan 24, 2023
7b976a2
Check for staging revision via Transformer type
Jiaweihu08 Jan 24, 2023
85a6fcd
Avoid appending on the staging revision
Jiaweihu08 Jan 24, 2023
abc6ea3
Complete conversion test
Jiaweihu08 Jan 24, 2023
a353645
Complete hybrid table test
Jiaweihu08 Jan 24, 2023
4c4ae59
Code comment
Jiaweihu08 Jan 25, 2023
44245df
Update documentation to include the staging revision and the ConvertT…
Jiaweihu08 Jan 25, 2023
5895f97
Optimize imports
Jiaweihu08 Jan 25, 2023
2c6711d
Check total num revisions after append on a converted table
Jiaweihu08 Jan 25, 2023
ab91ff0
Place staging revision utils under RevisionUtils
Jiaweihu08 Jan 25, 2023
d550497
Test EmptyTransformer/ation
Jiaweihu08 Jan 25, 2023
6bc7ceb
Add comment and remove unnecessary changes
Jiaweihu08 Jan 25, 2023
a7cbe6e
Move params and methods to RevisionUtils
Jiaweihu08 Jan 25, 2023
b01e806
isStagingFile is specific of QbeastSnapshot
Jiaweihu08 Jan 25, 2023
ba00ec4
Correct typo
Jiaweihu08 Jan 25, 2023
8969983
Fix loadRevisionAt
Jiaweihu08 Jan 25, 2023
d61538f
'Remove' partitioned table conversion support
Jiaweihu08 Jan 26, 2023
5620d25
Create exception message object
Jiaweihu08 Jan 26, 2023
c64fb5c
Update documentation
Jiaweihu08 Jan 26, 2023
f089c50
Test table identifier format
Jiaweihu08 Jan 26, 2023
b69320f
Test loadRevisionAt with invalid timestamp
Jiaweihu08 Jan 26, 2023
60d36aa
Skip Analyze and Optimize for the staging RevisionID
Jiaweihu08 Jan 26, 2023
57cacd4
Use AnalysisException, more test for EmptyTransformer
Jiaweihu08 Jan 26, 2023
1332bc9
Update metadata in MetadataManager
Jiaweihu08 Jan 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/src/main/scala/io/qbeast/core/model/MetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.qbeast.IISeq
* @tparam FileDescriptor type of file descriptor
*/
trait MetadataManager[DataSchema, FileDescriptor] {
type Configuration = Map[String, String]

/**
* Gets the Snapshot for a given table
Expand All @@ -33,6 +34,16 @@ trait MetadataManager[DataSchema, FileDescriptor] {
def updateWithTransaction(tableID: QTableID, schema: DataSchema, append: Boolean)(
writer: => (TableChanges, IISeq[FileDescriptor])): Unit

/**
* Updates the table metadata by overwriting the metadata configurations
* with the provided key-value pairs.
* @param tableID QTableID
* @param schema table schema
* @param update configurations used to overwrite the existing metadata
*/
def updateMetadataWithTransaction(tableID: QTableID, schema: DataSchema)(
update: => Configuration): Unit

/**
* Updates the Revision with the given RevisionChanges
* @param tableID the QTableID
Expand Down
46 changes: 41 additions & 5 deletions core/src/main/scala/io/qbeast/core/model/RevisionClasses.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import com.fasterxml.jackson.annotation.{JsonCreator, JsonValue}
import com.fasterxml.jackson.databind.annotation.JsonSerialize
import com.fasterxml.jackson.databind.annotation.JsonSerialize.Typing
import io.qbeast.IISeq
import io.qbeast.core.transform.{Transformation, Transformer}
import io.qbeast.core.model.RevisionUtils.stagingID
import io.qbeast.core.transform.{EmptyTransformer, Transformation, Transformer}

import scala.collection.immutable.SortedMap

Expand Down Expand Up @@ -60,11 +61,46 @@ object Revision {
desiredCubeSize,
columnTransformers,
Vector.empty)
}

/**
* Initialize Revision for table conversion. The RevisionID for a converted table is 0.
* EmptyTransformers and EmptyTransformations are used. This Revision should always be
* superseded.
*/
def emptyRevision(
tableID: QTableID,
desiredCubeSize: Int,
columnsToIndex: Seq[String]): Revision = {
val emptyTransformers = columnsToIndex.map(s => EmptyTransformer(s)).toIndexedSeq
val emptyTransformations = emptyTransformers.map(_.makeTransformation(r => r))

Revision(
stagingID,
System.currentTimeMillis(),
tableID,
desiredCubeSize,
emptyTransformers,
emptyTransformations)
}

}

object RevisionUtils {
val stagingID: RevisionID = 0

def isStaging(revisionID: RevisionID): Boolean =
revisionID == stagingID

def isStaging(revision: Revision): Boolean =
isStaging(revision.revisionID) &&
revision.columnTransformers.forall {
case _: EmptyTransformer => true
case _ => false
}

}

/**
* A revision of a QTable.
* @param revisionID the identifier of the revision
Expand All @@ -89,8 +125,7 @@ final case class Revision(
assert(columnTransformers != null || transformations != null)

/**
* *
* Controls that the this revision indexes all and only the provided columns.
* Controls that this revision indexes all and only the provided columns.
*
* @param columnsToIndex the column names to check.
* @return true if the revision indexes all and only the provided columns.
Expand All @@ -117,7 +152,7 @@ final case class Revision(

/**
* returns the normalized values
* @param values
* @param values row values for the indexing columns
* @return the normalized values
*/
def transform(values: IISeq[_]): IISeq[Double] = {
Expand Down Expand Up @@ -193,8 +228,9 @@ case class IndexStatus(
cubesStatuses: SortedMap[CubeId, CubeStatus] = SortedMap.empty)
extends Serializable {

def addAnnouncements(newAnnouncedSet: Set[CubeId]): IndexStatus =
def addAnnouncements(newAnnouncedSet: Set[CubeId]): IndexStatus = {
Jiaweihu08 marked this conversation as resolved.
Show resolved Hide resolved
copy(announcedSet = announcedSet ++ newAnnouncedSet)
}

def cubesToOptimize: Set[CubeId] = announcedSet.diff(replicatedSet)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.qbeast.core.transform

/**
* An empty Transformation meant for empty revisions
*/
case class EmptyTransformation() extends Transformation {
Jiaweihu08 marked this conversation as resolved.
Show resolved Hide resolved

override def transform(value: Any): Double = 0d

override def isSupersededBy(newTransformation: Transformation): Boolean = true

override def merge(other: Transformation): Transformation = other
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.qbeast.core.transform

import io.qbeast.core.model.QDataType

object EmptyTransformer extends TransformerType {
Jiaweihu08 marked this conversation as resolved.
Show resolved Hide resolved
override def transformerSimpleName: String = "empty"

override def apply(columnName: String, dataType: QDataType): Transformer =
EmptyTransformer(columnName)

}

/**
* An empty Transformer meant for empty revisions
*/
case class EmptyTransformer(columnName: String) extends Transformer {
override protected def transformerType: TransformerType = EmptyTransformer

override def stats: ColumnStats = NoColumnStats

override def makeTransformation(row: String => Any): Transformation = EmptyTransformation()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.qbeast.core.transform

import io.qbeast.core.model.DoubleDataType
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class EmptyTransformationTest extends AnyFlatSpec with Matchers {
it should "always map to the same value" in {
val t = EmptyTransformation()

(1 to 100).foreach { i =>
t.transform(i) shouldBe 0d
}

t.transform(null) shouldBe 0d
}

it should "be superseded by another Transformation" in {
val et = EmptyTransformation()
val ht = HashTransformation()
val lt = LinearTransformation(1d, 1.1, DoubleDataType)

et.isSupersededBy(ht) shouldBe true
et.isSupersededBy(lt) shouldBe true
}

it should "return the other Transformation when merging" in {
val et = EmptyTransformation()
val ht = HashTransformation()
val lt = LinearTransformation(1d, 1.1, DoubleDataType)

et.merge(ht) shouldBe ht
et.merge(lt) shouldBe lt
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package io.qbeast.core.transform

import io.qbeast.core.model.{DateDataType, IntegerDataType, TimestampDataType}
import io.qbeast.core.model.{DateDataType, IntegerDataType, StringDataType, TimestampDataType}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.sql.{Date, Timestamp}

class TransformerTest extends AnyFlatSpec with Matchers {
Expand Down Expand Up @@ -94,4 +95,14 @@ class TransformerTest extends AnyFlatSpec with Matchers {

transformer.maybeUpdateTransformation(currentTransformation, transformation) shouldBe None
}

"An EmptyTransformer" should "create an EmptyTransformation without stats" in {
EmptyTransformer.transformerSimpleName shouldBe "empty"

val colName = "a"
val transformer = EmptyTransformer(colName, StringDataType)

val transformation = transformer.makeTransformation(r => r)
transformation shouldBe a[EmptyTransformation]
}
}
30 changes: 30 additions & 0 deletions docs/QbeastFormat.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,36 @@ In Revision, you can find different information about the tree status and config

In this case, we index columns `user_id` and `product_id` (which are both `Integers`) with a linear transformation. This means that they will not suffer any transformation besides the normalization.

### Staging Revision and ConvertToQbeastCommand
The introduction of the staging revision enables reading tables in a hybrid `qbeast + delta` state.
The non-qbeast `AddFile`s are considered as part of this staging revision, all belonging to the root.

Its RevisionID is fixed to `stagingID = 0`, and it has `EmptyTransformer`s and `EmptyTransformation`s.
It is automatically created during the first write or when overwriting a table using qbeast.
For a table that is entirely written in `delta` or `parquet`, we can use the `ConvertToQbeastCommand` to create this revision:
```scala
import io.qbeast.spark.internal.commands.ConvertToQbeastCommand

val path = "/pathToTable/"
val tableIdentifier = s"parquet.`$path`"
val columnsToIndex = Seq("col1", "col2", "col3")
val desiredCubeSize = 50000

ConvertToQbeastCommand(tableIdentifier, columnsToIndex, desiredCubeSize).run(spark)

val qTable = spark.read.format("qbeast").load(path)
```
By doing so, we also enable subsequent appends using either delta or qbeast.
Conversion on a partitioned table is not supported.

`Compaction` can be performed on the staging revision to group small delta files:
```scala
import io.qbeast.spark.QbeastTable

val table = QbeastTable.forPath(spark, "/pathToTable/")
table.compact(0)
```

### State changes in MetaData

**Data de-normalization** is a crucial component behind our multi-dimensional index. Instead of storing an index in a separate tree-like data structure, we reorganize the data and their replications in an `OTree`, whose **hierarchical structure** is the actual index.
Expand Down
29 changes: 19 additions & 10 deletions src/main/scala/io/qbeast/spark/QbeastTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.qbeast.spark

import io.qbeast.context.QbeastContext
import io.qbeast.core.model.RevisionUtils.isStaging
import io.qbeast.core.model.{CubeId, CubeStatus, QTableID, RevisionID}
import io.qbeast.spark.delta.DeltaQbeastSnapshot
import io.qbeast.spark.internal.commands.{
Expand Down Expand Up @@ -56,13 +57,17 @@ class QbeastTable private (
* If doesn't exist or none is specified, would be the last available
*/
def optimize(revisionID: RevisionID): Unit = {
checkRevisionAvailable(revisionID)
OptimizeTableCommand(revisionID, indexedTable)
.run(sparkSession)
if (!isStaging(revisionID)) {
checkRevisionAvailable(revisionID)
OptimizeTableCommand(revisionID, indexedTable)
.run(sparkSession)
}
}

def optimize(): Unit = {
optimize(latestRevisionAvailableID)
if (!isStaging(latestRevisionAvailableID)) {
optimize(latestRevisionAvailableID)
}
}

/**
Expand All @@ -73,14 +78,18 @@ class QbeastTable private (
* @return the sequence of cubes to optimize in string representation
*/
def analyze(revisionID: RevisionID): Seq[String] = {
checkRevisionAvailable(revisionID)
AnalyzeTableCommand(revisionID, indexedTable)
.run(sparkSession)
.map(_.getString(0))
if (isStaging(revisionID)) Seq.empty
else {
checkRevisionAvailable(revisionID)
AnalyzeTableCommand(revisionID, indexedTable)
.run(sparkSession)
.map(_.getString(0))
}
}

def analyze(): Seq[String] = {
analyze(latestRevisionAvailableID)
if (isStaging(latestRevisionAvailableID)) Seq.empty
else analyze(latestRevisionAvailableID)
}

/**
Expand All @@ -103,7 +112,7 @@ class QbeastTable private (
val allCubeStatuses = qbeastSnapshot.loadLatestIndexStatus.cubesStatuses

val cubeCount = allCubeStatuses.size
val depth = allCubeStatuses.map(_._1.depth).max
val depth = if (cubeCount == 0) 0 else allCubeStatuses.map(_._1.depth).max
val rowCount = allCubeStatuses.flatMap(_._2.files.map(_.elementCount)).sum

val dimensionCount = indexedColumns().size
Expand Down
29 changes: 21 additions & 8 deletions src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,9 @@ package io.qbeast.spark.delta

import io.qbeast.core.model.{QTableID, RevisionID, TableChanges}
import io.qbeast.spark.delta.writer.StatsTracker.registerStatsTrackers
import io.qbeast.spark.utils.QbeastExceptionMessages.partitionedTableExceptionMsg
import io.qbeast.spark.utils.TagColumns
import org.apache.spark.sql.delta.actions.{
Action,
AddFile,
FileAction,
RemoveFile,
SetTransaction
}
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.DeltaCommand
import org.apache.spark.sql.delta.{DeltaLog, DeltaOperations, DeltaOptions, OptimisticTransaction}
import org.apache.spark.sql.execution.datasources.{
Expand Down Expand Up @@ -84,6 +79,24 @@ private[delta] case class DeltaMetadataWriter(
}
}

def updateMetadataWithTransaction(update: => Configuration): Unit = {
deltaLog.withNewTransaction { txn =>
if (txn.metadata.partitionColumns.nonEmpty) {
throw AnalysisExceptionFactory.create(partitionedTableExceptionMsg)
}

val config = update
val updatedConfig = config.foldLeft(txn.metadata.configuration) { case (accConf, (k, v)) =>
accConf.updated(k, v)
}
val updatedMetadata = txn.metadata.copy(configuration = updatedConfig)

val op = DeltaOperations.SetTableProperties(config)
txn.updateMetadata(updatedMetadata)
txn.commit(Seq.empty, op)
}
}

private def updateReplicatedFiles(tableChanges: TableChanges): Seq[Action] = {

val revision = tableChanges.updatedRevision
Expand Down Expand Up @@ -168,7 +181,7 @@ private[delta] case class DeltaMetadataWriter(
addFiles.map(_.copy(dataChange = !rearrangeOnly)) ++
deletedFiles.map(_.copy(dataChange = !rearrangeOnly))
} else {
newFiles ++ deletedFiles
addFiles ++ deletedFiles
}

if (isOptimizeOperation) {
Expand Down
Loading