Skip to content

Commit

Permalink
Issue #398: Fix small overhead added during the refactoirng (#436)
Browse files Browse the repository at this point in the history
* Fix QbeastStats serialization/deserialization
  • Loading branch information
JosepSampe authored Oct 17, 2024
1 parent c4e2859 commit 686c183
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 32 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/io/qbeast/core/model/QbeastStats.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ case class QbeastStats(
numRecords: Long,
minValues: Map[String, String],
maxValues: Map[String, String],
nullCount: Map[String, Int])
nullCount: Map[String, String])
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.qbeast.core.model.IndexFile
import io.qbeast.core.model.PreCommitHook
import io.qbeast.core.model.PreCommitHook.PreCommitHookOutput
import io.qbeast.core.model.QTableID
import io.qbeast.core.model.QbeastFile
import io.qbeast.core.model.QbeastHookLoader
import io.qbeast.core.model.RevisionID
import io.qbeast.core.model.TableChanges
Expand Down Expand Up @@ -151,10 +152,9 @@ private[delta] case class DeltaMetadataWriter(
* @return
* A Map[String, String] representing the combined outputs of all hooks.
*/
private def runPreCommitHooks(actions: Seq[Action]): PreCommitHookOutput = {
val qbeastActions = actions.map(DeltaQbeastFileUtils.fromAction)
private def runPreCommitHooks(actions: Seq[QbeastFile]): PreCommitHookOutput = {
preCommitHooks.foldLeft(Map.empty[String, String]) { (acc, hook) =>
acc ++ hook.run(qbeastActions)
acc ++ hook.run(actions)
}
}

Expand All @@ -173,21 +173,24 @@ private[delta] case class DeltaMetadataWriter(
// Register metrics to use in the Commit Info
val statsTrackers = createStatsTrackers(txn)
registerStatsTrackers(statsTrackers)
// Execute write

val (changes, indexFiles, deleteFiles) = writer
// Execute write
val (tableChanges, indexFiles, deleteFiles) = writer
val addFiles = indexFiles.map(DeltaQbeastFileUtils.toAddFile(dataChange = true))
val removeFiles = deleteFiles.map(DeltaQbeastFileUtils.toRemoveFile(dataChange = false))

// Update Qbeast Metadata (replicated set, revision..)
var actions = updateMetadata(txn, changes, addFiles, removeFiles)
var actions = updateMetadata(txn, tableChanges, addFiles, removeFiles)
// Set transaction identifier if specified
for (txnVersion <- options.txnVersion; txnAppId <- options.txnAppId) {
actions +:= SetTransaction(txnAppId, txnVersion, Some(System.currentTimeMillis()))
}

// Run pre-commit hooks
val tags = runPreCommitHooks(actions)
val revision = tableChanges.updatedRevision
val dimensionCount = revision.transformations.length
val qbeastActions = actions.map(DeltaQbeastFileUtils.fromAction(dimensionCount))
val tags = runPreCommitHooks(qbeastActions)

// Commit the information to the DeltaLog
val op =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ private[delta] object DeltaQbeastFileUtils {
* @param action
* the action instance
*/
def fromAction(action: Action): QbeastFile = {
def fromAction(dimensionCount: Int)(action: Action): QbeastFile = {
action match {
case addFile: AddFile => fromAddFile(1)(addFile)
case addFile: AddFile => fromAddFile(dimensionCount)(addFile)
case removeFile: RemoveFile => fromRemoveFile(removeFile)
case _ => null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ case class DeltaQbeastSnapshot(tableID: QTableID) extends QbeastSnapshot with De
*/
override def isInitial: Boolean = snapshot.version == -1

override val schema: StructType = snapshot.metadata.schema
override lazy val schema: StructType = snapshot.metadata.schema

override val allFilesCount: Long = snapshot.allFiles.count
override lazy val allFilesCount: Long = snapshot.allFiles.count

private val metadataMap: Map[String, String] = snapshot.metadata.configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.databind.node.JsonNodeType
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.JsonDeserializer
import com.fasterxml.jackson.databind.JsonMappingException
Expand All @@ -30,9 +31,12 @@ import io.qbeast.core.model.mapper
import io.qbeast.core.model.QbeastStats

private[delta] object QbeastStatsUtils {
private val module = new SimpleModule()
module.addSerializer(classOf[String], new ValueSerializer)
module.addDeserializer(classOf[String], new ValueDeserializer)

val module: SimpleModule = new SimpleModule()
.addSerializer(classOf[String], new ValueSerializer)
.addDeserializer(classOf[String], new ValueDeserializer)
.addDeserializer(classOf[Map[String, String]], new MapDeserializer)

mapper.registerModule(module)

def fromString(jsonString: String): Option[QbeastStats] = {
Expand All @@ -55,24 +59,49 @@ private[delta] object QbeastStatsUtils {

}

class ValueSerializer extends JsonSerializer[String] {
class ValueSerializer extends JsonSerializer[Any] {

override def serialize(
value: String,
value: Any,
gen: JsonGenerator,
serializers: SerializerProvider): Unit = {
try {
val intValue = value.toInt
gen.writeNumber(intValue)
} catch {
case _: NumberFormatException =>
value match {
case m: Map[_, _] =>
gen.writeStartObject()
m.foreach { case (key, v) =>
gen.writeFieldName(key.toString)
v match {
case nestedMap: Map[_, _] =>
serialize(nestedMap, gen, serializers)
case s: String =>
try {
val jsonNode = mapper.readTree(s)
gen.writeTree(jsonNode)
} catch {
case _: Exception =>
gen.writeString(s)
}
case i: Int => gen.writeNumber(i)
case l: Long => gen.writeNumber(l)
case d: Double => gen.writeNumber(d)
case other => gen.writeString(other.toString)
}
}
gen.writeEndObject()

case s: String =>
try {
val doubleValue = value.toDouble
gen.writeNumber(doubleValue)
val jsonNode = mapper.readTree(s)
gen.writeTree(jsonNode)
} catch {
case _: NumberFormatException =>
gen.writeString(value)
case _: Exception =>
gen.writeString(s)
}

case i: Int => gen.writeNumber(i)
case l: Long => gen.writeNumber(l)
case d: Double => gen.writeNumber(d)
case other => gen.writeString(other.toString)
}
}

Expand All @@ -92,3 +121,26 @@ class ValueDeserializer extends JsonDeserializer[String] {
}

}

class MapDeserializer extends JsonDeserializer[Map[String, String]] {

override def deserialize(p: JsonParser, ctxt: DeserializationContext): Map[String, String] = {
val node = p.getCodec.readTree[JsonNode](p)
val mapBuilder = scala.collection.mutable.Map[String, String]()

if (node.isObject) {
node.fields().forEachRemaining { entry =>
val key = entry.getKey
val valueNode = entry.getValue
if (valueNode.getNodeType == JsonNodeType.OBJECT) {
mapBuilder(key) = valueNode.toString
} else {
mapBuilder(key) = valueNode.asText()
}
}
}

mapBuilder.toMap
}

}
102 changes: 102 additions & 0 deletions delta/src/test/scala/io/qbeast/spark/delta/DeltaQbeastStatsTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.spark.delta

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import io.qbeast.core.model.mapper
import io.qbeast.core.model.QbeastStats
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class DeltaQbeastStatsTest extends AnyFlatSpec with Matchers {

mapper.registerModule(QbeastStatsUtils.module)

def areJsonEqual(json1: String, json2: String): Boolean = {
val basicMapper = new ObjectMapper()

try {
val node1: JsonNode = basicMapper.readTree(json1)
val node2: JsonNode = basicMapper.readTree(json2)

node1.equals(node2)
} catch {
case e: Exception =>
println(s"Error parsing JSON: ${e.getMessage}")
false
}
}

"ValueSerializer" should "serialize a numeric string to a number" in {
val json = mapper.writeValueAsString("123") // This uses ValueSerializer
json shouldEqual "123" // It should output a number in JSON
}

it should "serialize a non-numeric string as is" in {
val json = mapper.writeValueAsString("hello") // This uses ValueSerializer
json shouldEqual "\"hello\""
}

"ValueDeserializer" should "deserialize a number string to a string" in {
val value = mapper.readValue("123", classOf[String]) // This uses ValueDeserializer
value shouldEqual "123"
}

it should "deserialize a textual value as is" in {
val value = mapper.readValue("\"hello\"", classOf[String]) // This uses ValueDeserializer
value shouldEqual "hello"
}

"MapDeserializer" should "deserialize a JSON object to a Map[String, String]" in {
val json = """{"key1": "value1", "key2": {"innerKey": "innerValue"}}"""
val result = mapper.readValue(json, classOf[Map[String, String]]) // This uses MapDeserializer

result should contain("key1" -> "value1")
result should contain("key2" -> """{"innerKey":"innerValue"}""")
}

"QbeastStatsUtils" should "serialize and deserialize correctly" in {
val jsonString =
"""{"numRecords":52041,"minValues":{"key1": "value1", "key2": {"innerKey": "innerValue"}},
|"maxValues":{"key3": "value3", "key4": "value4"},"nullCount":{"key5": 0, "key6": 2}}""".stripMargin

// Create the expected QbeastStats object
val expectedStats = QbeastStats(
numRecords = 52041,
minValues = Map("key2" -> """{"innerKey":"innerValue"}""", "key1" -> "value1"),
maxValues = Map("key3" -> "value3", "key4" -> "value4"),
nullCount = Map("key5" -> "0", "key6" -> "2"))

// Deserialize the JSON string
val deserializedStats = QbeastStatsUtils.fromString(jsonString)

// Verify that deserialization was successful and matches expected values
deserializedStats shouldBe defined // Ensure deserialization was successful

deserializedStats.foreach { ds =>
ds.numRecords shouldEqual expectedStats.numRecords
ds.minValues should contain theSameElementsAs expectedStats.minValues
ds.maxValues should contain theSameElementsAs expectedStats.maxValues
ds.nullCount should contain theSameElementsAs expectedStats.nullCount
}

// Serialize back to JSON string and verify it matches the original
val serializedJsonString = QbeastStatsUtils.toString(deserializedStats.get)
areJsonEqual(serializedJsonString, jsonString) shouldBe true
}

}
5 changes: 2 additions & 3 deletions src/main/scala/io/qbeast/sources/QbeastBaseRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,14 @@ object QbeastBaseRelation {
val spark = SparkSession.active
val tableID = table.tableID
val snapshot = QbeastContext.metadataManager.loadSnapshot(tableID)
val schema = QbeastContext.metadataManager.loadCurrentSchema(tableID)
if (snapshot.isInitial) {
// If the Table is initial, read empty relation
// This could happen if we CREATE/REPLACE TABLE without inserting data
// In this case, we use the options variable
new HadoopFsRelation(
EmptyFileIndex,
partitionSchema = StructType(Seq.empty[StructField]),
dataSchema = schema,
dataSchema = snapshot.schema,
bucketSpec = None,
new ParquetFileFormat(),
options)(spark) with InsertableRelation {
Expand All @@ -87,7 +86,7 @@ object QbeastBaseRelation {
new HadoopFsRelation(
fileIndex,
partitionSchema = StructType(Seq.empty[StructField]),
dataSchema = schema,
dataSchema = snapshot.schema,
bucketSpec = bucketSpec,
file,
parameters)(spark) with InsertableRelation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.qbeast.spark.delta

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import io.delta.tables._
import io.qbeast.QbeastIntegrationTestSpec
import org.apache.spark.sql.delta.DeltaLog
Expand All @@ -31,6 +33,21 @@ class QbeastDeltaIntegrationTest extends QbeastIntegrationTestSpec {
Seq(("A", 1), ("B", 2), ("C", 3)).toDF("a", "b")
}

def areJsonEqual(json1: String, json2: String): Boolean = {
val basicMapper = new ObjectMapper()

try {
val node1: JsonNode = basicMapper.readTree(json1)
val node2: JsonNode = basicMapper.readTree(json2)

node1.equals(node2)
} catch {
case e: Exception =>
println(s"Error parsing JSON: ${e.getMessage}")
false
}
}

"Qbeast" should "output correctly Operation Metrics in Delta History" in
withQbeastContextSparkAndTmpDir((spark, tmpDir) => {

Expand Down Expand Up @@ -63,10 +80,13 @@ class QbeastDeltaIntegrationTest extends QbeastIntegrationTestSpec {
val stats =
DeltaLog.forTable(spark, tmpDir).unsafeVolatileSnapshot.allFiles.collect().map(_.stats)
stats.length shouldBe >(0)
stats.head shouldBe "{\"numRecords\":3,\"minValues\":{\"a\":\"A\",\"b\":1}," +
"\"maxValues\":{\"a\":\"C\",\"b\":3}," +
"\"nullCount\":{\"a\":0,\"b\":0}}"

val expectedStats =
"""{"numRecords":3,"minValues":{"a":"A","b":1},
|"maxValues":{"a":"C","b":3},
|"nullCount":{"a":0,"b":0}}""".stripMargin

areJsonEqual(stats.head, expectedStats) shouldBe true
})

it should "not write stats when specified" in withExtendedSparkAndTmpDir(
Expand Down

0 comments on commit 686c183

Please sign in to comment.