From 414d0238495c0ad5991cd1ade19a56b06e524324 Mon Sep 17 00:00:00 2001 From: JosepSampe Date: Fri, 4 Oct 2024 18:46:17 +0200 Subject: [PATCH 1/5] Use a cache in the Delta Metadata manager --- .../scala/io/qbeast/spark/delta/DeltaMetadataManager.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/delta/src/main/scala/io/qbeast/spark/delta/DeltaMetadataManager.scala b/delta/src/main/scala/io/qbeast/spark/delta/DeltaMetadataManager.scala index cd5e42275..d3daa3da7 100644 --- a/delta/src/main/scala/io/qbeast/spark/delta/DeltaMetadataManager.scala +++ b/delta/src/main/scala/io/qbeast/spark/delta/DeltaMetadataManager.scala @@ -28,6 +28,8 @@ import org.apache.spark.sql.SparkSession */ object DeltaMetadataManager extends MetadataManager { + private val deltaLogCache = scala.collection.mutable.Map[QTableID, DeltaLog]() + override def updateWithTransaction( tableID: QTableID, schema: StructType, @@ -72,7 +74,7 @@ object DeltaMetadataManager extends MetadataManager { * @return */ def loadDeltaLog(tableID: QTableID): DeltaLog = { - DeltaLog.forTable(SparkSession.active, tableID.id) + deltaLogCache.getOrElseUpdate(tableID, DeltaLog.forTable(SparkSession.active, tableID.id)) } override def hasConflicts( From 094b3f4751675f0a1984c8de44286db829ab8fa7 Mon Sep 17 00:00:00 2001 From: JosepSampe Date: Sat, 5 Oct 2024 18:45:53 +0200 Subject: [PATCH 2/5] Make some variables lazy to reduce some overhead --- .../scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala | 4 ++-- src/main/scala/io/qbeast/sources/QbeastBaseRelation.scala | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/delta/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala b/delta/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala index 830dfaf4d..1f4db1a9e 100644 --- a/delta/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala +++ b/delta/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala @@ -49,9 +49,9 @@ case class DeltaQbeastSnapshot(tableID: QTableID) extends QbeastSnapshot with De */ override def isInitial: Boolean = snapshot.version == -1 - override val schema: StructType = snapshot.metadata.schema + override lazy val schema: StructType = snapshot.metadata.schema - override val allFilesCount: Long = snapshot.allFiles.count + override lazy val allFilesCount: Long = snapshot.allFiles.count private val metadataMap: Map[String, String] = snapshot.metadata.configuration diff --git a/src/main/scala/io/qbeast/sources/QbeastBaseRelation.scala b/src/main/scala/io/qbeast/sources/QbeastBaseRelation.scala index 3c044ffb6..4fcb9acec 100644 --- a/src/main/scala/io/qbeast/sources/QbeastBaseRelation.scala +++ b/src/main/scala/io/qbeast/sources/QbeastBaseRelation.scala @@ -58,7 +58,6 @@ object QbeastBaseRelation { val spark = SparkSession.active val tableID = table.tableID val snapshot = QbeastContext.metadataManager.loadSnapshot(tableID) - val schema = QbeastContext.metadataManager.loadCurrentSchema(tableID) if (snapshot.isInitial) { // If the Table is initial, read empty relation // This could happen if we CREATE/REPLACE TABLE without inserting data @@ -66,7 +65,7 @@ object QbeastBaseRelation { new HadoopFsRelation( EmptyFileIndex, partitionSchema = StructType(Seq.empty[StructField]), - dataSchema = schema, + dataSchema = snapshot.schema, bucketSpec = None, new ParquetFileFormat(), options)(spark) with InsertableRelation { @@ -87,7 +86,7 @@ object QbeastBaseRelation { new HadoopFsRelation( fileIndex, partitionSchema = StructType(Seq.empty[StructField]), - dataSchema = schema, + dataSchema = snapshot.schema, bucketSpec = bucketSpec, file, parameters)(spark) with InsertableRelation { From b3b37aed1ff4c6e295d61e3b9f11f460211f2400 Mon Sep 17 00:00:00 2001 From: JosepSampe Date: Fri, 11 Oct 2024 11:30:57 +0200 Subject: [PATCH 3/5] Fix QbeastStats serialization/deserialization --- .../io/qbeast/core/model/QbeastStats.scala | 2 +- .../spark/delta/DeltaMetadataManager.scala | 4 +- .../spark/delta/DeltaMetadataWriter.scala | 17 +-- .../spark/delta/DeltaQbeastFileUtils.scala | 4 +- .../spark/delta/DeltaQbeastStatsUtils.scala | 80 ++++++++++--- .../spark/delta/DeltaQbeastStatsTest.scala | 109 ++++++++++++++++++ 6 files changed, 189 insertions(+), 27 deletions(-) create mode 100644 delta/src/test/scala/io/qbeast/spark/delta/DeltaQbeastStatsTest.scala diff --git a/core/src/main/scala/io/qbeast/core/model/QbeastStats.scala b/core/src/main/scala/io/qbeast/core/model/QbeastStats.scala index ba086ef75..a47197ddb 100644 --- a/core/src/main/scala/io/qbeast/core/model/QbeastStats.scala +++ b/core/src/main/scala/io/qbeast/core/model/QbeastStats.scala @@ -19,4 +19,4 @@ case class QbeastStats( numRecords: Long, minValues: Map[String, String], maxValues: Map[String, String], - nullCount: Map[String, Int]) + nullCount: Map[String, String]) diff --git a/delta/src/main/scala/io/qbeast/spark/delta/DeltaMetadataManager.scala b/delta/src/main/scala/io/qbeast/spark/delta/DeltaMetadataManager.scala index d3daa3da7..cd5e42275 100644 --- a/delta/src/main/scala/io/qbeast/spark/delta/DeltaMetadataManager.scala +++ b/delta/src/main/scala/io/qbeast/spark/delta/DeltaMetadataManager.scala @@ -28,8 +28,6 @@ import org.apache.spark.sql.SparkSession */ object DeltaMetadataManager extends MetadataManager { - private val deltaLogCache = scala.collection.mutable.Map[QTableID, DeltaLog]() - override def updateWithTransaction( tableID: QTableID, schema: StructType, @@ -74,7 +72,7 @@ object DeltaMetadataManager extends MetadataManager { * @return */ def loadDeltaLog(tableID: QTableID): DeltaLog = { - deltaLogCache.getOrElseUpdate(tableID, DeltaLog.forTable(SparkSession.active, tableID.id)) + DeltaLog.forTable(SparkSession.active, tableID.id) } override def hasConflicts( diff --git a/delta/src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala b/delta/src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala index 0731d2031..c59b405b0 100644 --- a/delta/src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala +++ b/delta/src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala @@ -20,6 +20,7 @@ import io.qbeast.core.model.IndexFile import io.qbeast.core.model.PreCommitHook import io.qbeast.core.model.PreCommitHook.PreCommitHookOutput import io.qbeast.core.model.QTableID +import io.qbeast.core.model.QbeastFile import io.qbeast.core.model.QbeastHookLoader import io.qbeast.core.model.RevisionID import io.qbeast.core.model.TableChanges @@ -151,10 +152,9 @@ private[delta] case class DeltaMetadataWriter( * @return * A Map[String, String] representing the combined outputs of all hooks. */ - private def runPreCommitHooks(actions: Seq[Action]): PreCommitHookOutput = { - val qbeastActions = actions.map(DeltaQbeastFileUtils.fromAction) + private def runPreCommitHooks(actions: Seq[QbeastFile]): PreCommitHookOutput = { preCommitHooks.foldLeft(Map.empty[String, String]) { (acc, hook) => - acc ++ hook.run(qbeastActions) + acc ++ hook.run(actions) } } @@ -173,21 +173,24 @@ private[delta] case class DeltaMetadataWriter( // Register metrics to use in the Commit Info val statsTrackers = createStatsTrackers(txn) registerStatsTrackers(statsTrackers) - // Execute write - val (changes, indexFiles, deleteFiles) = writer + // Execute write + val (tableChanges, indexFiles, deleteFiles) = writer val addFiles = indexFiles.map(DeltaQbeastFileUtils.toAddFile(dataChange = true)) val removeFiles = deleteFiles.map(DeltaQbeastFileUtils.toRemoveFile(dataChange = false)) // Update Qbeast Metadata (replicated set, revision..) - var actions = updateMetadata(txn, changes, addFiles, removeFiles) + var actions = updateMetadata(txn, tableChanges, addFiles, removeFiles) // Set transaction identifier if specified for (txnVersion <- options.txnVersion; txnAppId <- options.txnAppId) { actions +:= SetTransaction(txnAppId, txnVersion, Some(System.currentTimeMillis())) } // Run pre-commit hooks - val tags = runPreCommitHooks(actions) + val revision = tableChanges.updatedRevision + val dimensionCount = revision.transformations.length + val qbeastActions = actions.map(DeltaQbeastFileUtils.fromAction(dimensionCount)) + val tags = runPreCommitHooks(qbeastActions) // Commit the information to the DeltaLog val op = diff --git a/delta/src/main/scala/io/qbeast/spark/delta/DeltaQbeastFileUtils.scala b/delta/src/main/scala/io/qbeast/spark/delta/DeltaQbeastFileUtils.scala index a88b5da3e..6349767ce 100644 --- a/delta/src/main/scala/io/qbeast/spark/delta/DeltaQbeastFileUtils.scala +++ b/delta/src/main/scala/io/qbeast/spark/delta/DeltaQbeastFileUtils.scala @@ -135,9 +135,9 @@ private[delta] object DeltaQbeastFileUtils { * @param action * the action instance */ - def fromAction(action: Action): QbeastFile = { + def fromAction(dimensionCount: Int)(action: Action): QbeastFile = { action match { - case addFile: AddFile => fromAddFile(1)(addFile) + case addFile: AddFile => fromAddFile(dimensionCount)(addFile) case removeFile: RemoveFile => fromRemoveFile(removeFile) case _ => null } diff --git a/delta/src/main/scala/io/qbeast/spark/delta/DeltaQbeastStatsUtils.scala b/delta/src/main/scala/io/qbeast/spark/delta/DeltaQbeastStatsUtils.scala index 72bc9b274..9de95789b 100644 --- a/delta/src/main/scala/io/qbeast/spark/delta/DeltaQbeastStatsUtils.scala +++ b/delta/src/main/scala/io/qbeast/spark/delta/DeltaQbeastStatsUtils.scala @@ -19,6 +19,7 @@ import com.fasterxml.jackson.core.JsonGenerator import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.databind.module.SimpleModule +import com.fasterxml.jackson.databind.node.JsonNodeType import com.fasterxml.jackson.databind.DeserializationContext import com.fasterxml.jackson.databind.JsonDeserializer import com.fasterxml.jackson.databind.JsonMappingException @@ -30,9 +31,12 @@ import io.qbeast.core.model.mapper import io.qbeast.core.model.QbeastStats private[delta] object QbeastStatsUtils { - private val module = new SimpleModule() - module.addSerializer(classOf[String], new ValueSerializer) - module.addDeserializer(classOf[String], new ValueDeserializer) + + val module: SimpleModule = new SimpleModule() + .addSerializer(classOf[String], new ValueSerializer) + .addDeserializer(classOf[String], new ValueDeserializer) + .addDeserializer(classOf[Map[String, String]], new MapDeserializer) + mapper.registerModule(module) def fromString(jsonString: String): Option[QbeastStats] = { @@ -55,24 +59,49 @@ private[delta] object QbeastStatsUtils { } -class ValueSerializer extends JsonSerializer[String] { +class ValueSerializer extends JsonSerializer[Any] { override def serialize( - value: String, + value: Any, gen: JsonGenerator, serializers: SerializerProvider): Unit = { - try { - val intValue = value.toInt - gen.writeNumber(intValue) - } catch { - case _: NumberFormatException => + value match { + case m: Map[_, _] => + gen.writeStartObject() + m.foreach { case (key, v) => + gen.writeFieldName(key.toString) + v match { + case nestedMap: Map[_, _] => + serialize(nestedMap, gen, serializers) + case s: String => + try { + val jsonNode = mapper.readTree(s) + gen.writeTree(jsonNode) + } catch { + case _: Exception => + gen.writeString(s) + } + case i: Int => gen.writeNumber(i) + case l: Long => gen.writeNumber(l) + case d: Double => gen.writeNumber(d) + case other => gen.writeString(other.toString) + } + } + gen.writeEndObject() + + case s: String => try { - val doubleValue = value.toDouble - gen.writeNumber(doubleValue) + val jsonNode = mapper.readTree(s) + gen.writeTree(jsonNode) } catch { - case _: NumberFormatException => - gen.writeString(value) + case _: Exception => + gen.writeString(s) } + + case i: Int => gen.writeNumber(i) + case l: Long => gen.writeNumber(l) + case d: Double => gen.writeNumber(d) + case other => gen.writeString(other.toString) } } @@ -92,3 +121,26 @@ class ValueDeserializer extends JsonDeserializer[String] { } } + +class MapDeserializer extends JsonDeserializer[Map[String, String]] { + + override def deserialize(p: JsonParser, ctxt: DeserializationContext): Map[String, String] = { + val node = p.getCodec.readTree[JsonNode](p) + val mapBuilder = scala.collection.mutable.Map[String, String]() + + if (node.isObject) { + node.fields().forEachRemaining { entry => + val key = entry.getKey + val valueNode = entry.getValue + if (valueNode.getNodeType == JsonNodeType.OBJECT) { + mapBuilder(key) = valueNode.toString + } else { + mapBuilder(key) = valueNode.asText() + } + } + } + + mapBuilder.toMap + } + +} diff --git a/delta/src/test/scala/io/qbeast/spark/delta/DeltaQbeastStatsTest.scala b/delta/src/test/scala/io/qbeast/spark/delta/DeltaQbeastStatsTest.scala new file mode 100644 index 000000000..c8a84f471 --- /dev/null +++ b/delta/src/test/scala/io/qbeast/spark/delta/DeltaQbeastStatsTest.scala @@ -0,0 +1,109 @@ +/* + * Copyright 2021 Qbeast Analytics, S.L. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.qbeast.spark.delta + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import io.qbeast.core.model.mapper +import io.qbeast.core.model.QbeastStats +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class DeltaQbeastStatsTest extends AnyFlatSpec with Matchers { + + mapper.registerModule(QbeastStatsUtils.module) + + def areJsonEqual(json1: String, json2: String): Boolean = { + val basicMapper = new ObjectMapper() + + try { + val node1: JsonNode = basicMapper.readTree(json1) + val node2: JsonNode = basicMapper.readTree(json2) + + val normalizedJson1 = + basicMapper.writerWithDefaultPrettyPrinter().writeValueAsString(node1) + val normalizedJson2 = + basicMapper.writerWithDefaultPrettyPrinter().writeValueAsString(node2) + + // Compare the normalized JSON strings + normalizedJson1 == normalizedJson2 + } catch { + case e: Exception => + // Handle any parsing errors + println(s"Error parsing JSON: ${e.getMessage}") + false + } + } + + "ValueSerializer" should "serialize a numeric string to a number" in { + val json = mapper.writeValueAsString("123") // This uses ValueSerializer + json shouldEqual "123" // It should output a number in JSON + } + + it should "serialize a non-numeric string as is" in { + val json = mapper.writeValueAsString("hello") // This uses ValueSerializer + json shouldEqual "\"hello\"" + } + + "ValueDeserializer" should "deserialize a number string to a string" in { + val value = mapper.readValue("123", classOf[String]) // This uses ValueDeserializer + value shouldEqual "123" + } + + it should "deserialize a textual value as is" in { + val value = mapper.readValue("\"hello\"", classOf[String]) // This uses ValueDeserializer + value shouldEqual "hello" + } + + "MapDeserializer" should "deserialize a JSON object to a Map[String, String]" in { + val json = """{"key1": "value1", "key2": {"innerKey": "innerValue"}}""" + val result = mapper.readValue(json, classOf[Map[String, String]]) // This uses MapDeserializer + + result should contain("key1" -> "value1") + result should contain("key2" -> """{"innerKey":"innerValue"}""") + } + + "QbeastStatsUtils" should "serialize and deserialize correctly" in { + val jsonString = + """{"numRecords":52041,"minValues":{"key1": "value1", "key2": {"innerKey": "innerValue"}}, + |"maxValues":{"key3": "value3", "key4": "value4"},"nullCount":{"key5": 0, "key6": 2}}""".stripMargin + + // Create the expected QbeastStats object + val expectedStats = QbeastStats( + numRecords = 52041, + minValues = Map("key2" -> """{"innerKey":"innerValue"}""", "key1" -> "value1"), + maxValues = Map("key3" -> "value3", "key4" -> "value4"), + nullCount = Map("key5" -> "0", "key6" -> "2")) + + // Deserialize the JSON string + val deserializedStats = QbeastStatsUtils.fromString(jsonString) + + // Verify that deserialization was successful and matches expected values + deserializedStats shouldBe defined // Ensure deserialization was successful + + deserializedStats.foreach { ds => + ds.numRecords shouldEqual expectedStats.numRecords + ds.minValues should contain theSameElementsAs expectedStats.minValues + ds.maxValues should contain theSameElementsAs expectedStats.maxValues + ds.nullCount should contain theSameElementsAs expectedStats.nullCount + } + + // Serialize back to JSON string and verify it matches the original + val serializedJsonString = QbeastStatsUtils.toString(deserializedStats.get) + areJsonEqual(serializedJsonString, jsonString) + } + +} From 0186ea1f2066f2e29d4b83fa70d90496200b9db1 Mon Sep 17 00:00:00 2001 From: JosepSampe Date: Fri, 11 Oct 2024 12:16:02 +0200 Subject: [PATCH 4/5] Fix QbeastStats serialization/deserialization tests --- .../spark/delta/DeltaQbeastStatsTest.scala | 11 ++--------- .../io/qbeast/QbeastIntegrationTestSpec.scala | 17 +++++++++++++++++ .../delta/QbeastDeltaIntegrationTest.scala | 10 +++++++--- 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/delta/src/test/scala/io/qbeast/spark/delta/DeltaQbeastStatsTest.scala b/delta/src/test/scala/io/qbeast/spark/delta/DeltaQbeastStatsTest.scala index c8a84f471..b653c858b 100644 --- a/delta/src/test/scala/io/qbeast/spark/delta/DeltaQbeastStatsTest.scala +++ b/delta/src/test/scala/io/qbeast/spark/delta/DeltaQbeastStatsTest.scala @@ -33,16 +33,9 @@ class DeltaQbeastStatsTest extends AnyFlatSpec with Matchers { val node1: JsonNode = basicMapper.readTree(json1) val node2: JsonNode = basicMapper.readTree(json2) - val normalizedJson1 = - basicMapper.writerWithDefaultPrettyPrinter().writeValueAsString(node1) - val normalizedJson2 = - basicMapper.writerWithDefaultPrettyPrinter().writeValueAsString(node2) - - // Compare the normalized JSON strings - normalizedJson1 == normalizedJson2 + node1.equals(node2) } catch { case e: Exception => - // Handle any parsing errors println(s"Error parsing JSON: ${e.getMessage}") false } @@ -103,7 +96,7 @@ class DeltaQbeastStatsTest extends AnyFlatSpec with Matchers { // Serialize back to JSON string and verify it matches the original val serializedJsonString = QbeastStatsUtils.toString(deserializedStats.get) - areJsonEqual(serializedJsonString, jsonString) + areJsonEqual(serializedJsonString, jsonString) shouldBe true } } diff --git a/src/test/scala/io/qbeast/QbeastIntegrationTestSpec.scala b/src/test/scala/io/qbeast/QbeastIntegrationTestSpec.scala index b7b2aeef2..b137fc707 100644 --- a/src/test/scala/io/qbeast/QbeastIntegrationTestSpec.scala +++ b/src/test/scala/io/qbeast/QbeastIntegrationTestSpec.scala @@ -15,6 +15,8 @@ */ package io.qbeast +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper import com.github.mrpowers.spark.fast.tests.DatasetComparer import io.qbeast.context.QbeastContext import io.qbeast.context.QbeastContextImpl @@ -215,4 +217,19 @@ trait QbeastIntegrationTestSpec extends AnyFlatSpec with Matchers with DatasetCo QbeastContext.metadataManager.loadSnapshot(tableId) } + def areJsonEqual(json1: String, json2: String): Boolean = { + val basicMapper = new ObjectMapper() + + try { + val node1: JsonNode = basicMapper.readTree(json1) + val node2: JsonNode = basicMapper.readTree(json2) + + node1.equals(node2) + } catch { + case e: Exception => + println(s"Error parsing JSON: ${e.getMessage}") + false + } + } + } diff --git a/src/test/scala/io/qbeast/spark/delta/QbeastDeltaIntegrationTest.scala b/src/test/scala/io/qbeast/spark/delta/QbeastDeltaIntegrationTest.scala index dc2475319..e27fbd709 100644 --- a/src/test/scala/io/qbeast/spark/delta/QbeastDeltaIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/delta/QbeastDeltaIntegrationTest.scala @@ -63,10 +63,14 @@ class QbeastDeltaIntegrationTest extends QbeastIntegrationTestSpec { val stats = DeltaLog.forTable(spark, tmpDir).unsafeVolatileSnapshot.allFiles.collect().map(_.stats) stats.length shouldBe >(0) - stats.head shouldBe "{\"numRecords\":3,\"minValues\":{\"a\":\"A\",\"b\":1}," + - "\"maxValues\":{\"a\":\"C\",\"b\":3}," + - "\"nullCount\":{\"a\":0,\"b\":0}}" + val expectedStats = + """{"numRecords":3,"minValues":{"a":"A","b":1}, + |"maxValues":{"a":"C","b":3}, + |"nullCount":{"a":0,"b":0}}""".stripMargin + + + areJsonEqual(stats.head, expectedStats) shouldBe true }) it should "not write stats when specified" in withExtendedSparkAndTmpDir( From 11b229011be223ea66df01cef33443289ca7084f Mon Sep 17 00:00:00 2001 From: JosepSampe Date: Fri, 11 Oct 2024 12:39:35 +0200 Subject: [PATCH 5/5] Fix QbeastStats serialization/deserialization tests --- .../io/qbeast/QbeastIntegrationTestSpec.scala | 17 ----------------- .../delta/QbeastDeltaIntegrationTest.scala | 18 +++++++++++++++++- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/src/test/scala/io/qbeast/QbeastIntegrationTestSpec.scala b/src/test/scala/io/qbeast/QbeastIntegrationTestSpec.scala index b137fc707..b7b2aeef2 100644 --- a/src/test/scala/io/qbeast/QbeastIntegrationTestSpec.scala +++ b/src/test/scala/io/qbeast/QbeastIntegrationTestSpec.scala @@ -15,8 +15,6 @@ */ package io.qbeast -import com.fasterxml.jackson.databind.JsonNode -import com.fasterxml.jackson.databind.ObjectMapper import com.github.mrpowers.spark.fast.tests.DatasetComparer import io.qbeast.context.QbeastContext import io.qbeast.context.QbeastContextImpl @@ -217,19 +215,4 @@ trait QbeastIntegrationTestSpec extends AnyFlatSpec with Matchers with DatasetCo QbeastContext.metadataManager.loadSnapshot(tableId) } - def areJsonEqual(json1: String, json2: String): Boolean = { - val basicMapper = new ObjectMapper() - - try { - val node1: JsonNode = basicMapper.readTree(json1) - val node2: JsonNode = basicMapper.readTree(json2) - - node1.equals(node2) - } catch { - case e: Exception => - println(s"Error parsing JSON: ${e.getMessage}") - false - } - } - } diff --git a/src/test/scala/io/qbeast/spark/delta/QbeastDeltaIntegrationTest.scala b/src/test/scala/io/qbeast/spark/delta/QbeastDeltaIntegrationTest.scala index e27fbd709..a2dbc2d26 100644 --- a/src/test/scala/io/qbeast/spark/delta/QbeastDeltaIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/delta/QbeastDeltaIntegrationTest.scala @@ -15,6 +15,8 @@ */ package io.qbeast.spark.delta +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper import io.delta.tables._ import io.qbeast.QbeastIntegrationTestSpec import org.apache.spark.sql.delta.DeltaLog @@ -31,6 +33,21 @@ class QbeastDeltaIntegrationTest extends QbeastIntegrationTestSpec { Seq(("A", 1), ("B", 2), ("C", 3)).toDF("a", "b") } + def areJsonEqual(json1: String, json2: String): Boolean = { + val basicMapper = new ObjectMapper() + + try { + val node1: JsonNode = basicMapper.readTree(json1) + val node2: JsonNode = basicMapper.readTree(json2) + + node1.equals(node2) + } catch { + case e: Exception => + println(s"Error parsing JSON: ${e.getMessage}") + false + } + } + "Qbeast" should "output correctly Operation Metrics in Delta History" in withQbeastContextSparkAndTmpDir((spark, tmpDir) => { @@ -69,7 +86,6 @@ class QbeastDeltaIntegrationTest extends QbeastIntegrationTestSpec { |"maxValues":{"a":"C","b":3}, |"nullCount":{"a":0,"b":0}}""".stripMargin - areJsonEqual(stats.head, expectedStats) shouldBe true })