Skip to content

Commit

Permalink
Issue Qbeast-io#418: Abstract RollupDataWriter and QbeastStats (Qbeas…
Browse files Browse the repository at this point in the history
…t-io#423)

* Update rollup data writer

* Abstract QbeastStats

* Update QbeastStats to use Number

* Update QbeastStats to use String

* Update QbeastStats

* Update QbeastStats Delta
  • Loading branch information
JosepSampe committed Oct 24, 2024
1 parent 1e1baeb commit 566b9cd
Show file tree
Hide file tree
Showing 23 changed files with 264 additions and 92 deletions.
2 changes: 1 addition & 1 deletion src/main/scala/io/qbeast/context/QbeastContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package io.qbeast.context
import io.qbeast.core.keeper.Keeper
import io.qbeast.core.keeper.LocalKeeper
import io.qbeast.core.model._
import io.qbeast.spark.delta.writer.DeltaRollupDataWriter
import io.qbeast.spark.delta.DeltaMetadataManager
import io.qbeast.spark.delta.DeltaRollupDataWriter
import io.qbeast.spark.delta.DeltaStagingDataManagerFactory
import io.qbeast.spark.index.SparkColumnsToIndexSelector
import io.qbeast.spark.index.SparkOTreeManager
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/io/qbeast/core/model/IndexFileBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ final class IndexFileBuilder {
private var modificationTime: Long = 0L
private var revisionId: RevisionID = 0L
private val blocks = immutable.Seq.newBuilder[VolatileBlock]
private var stats: Option[String] = None
private var stats: Option[QbeastStats] = None

/**
* Sets the path.
Expand Down Expand Up @@ -62,7 +62,7 @@ final class IndexFileBuilder {
* @return
* this instance
*/
def setStats(stats: Option[String]): IndexFileBuilder = {
def setStats(stats: Option[QbeastStats]): IndexFileBuilder = {
this.stats = stats
this
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/io/qbeast/core/model/QbeastFiles.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ case class IndexFile(
modificationTime: Long,
revisionId: RevisionID,
blocks: IISeq[Block],
stats: Option[String] = null)
stats: Option[QbeastStats] = None)
extends QbeastFile {

/**
Expand Down
22 changes: 22 additions & 0 deletions src/main/scala/io/qbeast/core/model/QbeastStats.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.core.model

case class QbeastStats(
numRecords: Long,
minValues: Map[String, String],
maxValues: Map[String, String],
nullCount: Map[String, Int])
3 changes: 2 additions & 1 deletion src/main/scala/io/qbeast/core/model/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.qbeast.core
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.module.scala.ClassTagExtensions
import com.fasterxml.jackson.module.scala.DefaultScalaModule

package object model {
Expand All @@ -36,7 +37,7 @@ package object model {
.addModule(DefaultScalaModule)
.serializationInclusion(Include.NON_ABSENT)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.build()
.build() :: ClassTagExtensions
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import io.qbeast.core.model.QTableID
import io.qbeast.core.model.QbeastHookLoader
import io.qbeast.core.model.RevisionID
import io.qbeast.core.model.TableChanges
import io.qbeast.spark.delta.writer.StatsTracker.registerStatsTrackers
import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.spark.utils.QbeastExceptionMessages.partitionedTableExceptionMsg
import io.qbeast.spark.utils.TagColumns
import io.qbeast.spark.writer.StatsTracker.registerStatsTrackers
import org.apache.spark.internal.Logging
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.DeltaCommand
Expand Down
105 changes: 105 additions & 0 deletions src/main/scala/io/qbeast/spark/delta/DeltaRollupDataWriter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.spark.delta

import io.qbeast.core.model._
import io.qbeast.spark.index.QbeastColumns
import io.qbeast.spark.writer.RollupDataWriter
import io.qbeast.spark.writer.StatsTracker
import io.qbeast.spark.writer.TaskStats
import io.qbeast.IISeq
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.stats.DeltaFileStatistics
import org.apache.spark.sql.delta.stats.DeltaJobStatisticsTracker
import org.apache.spark.sql.delta.DeltaStatsCollectionUtils
import org.apache.spark.sql.execution.datasources.BasicWriteTaskStats
import org.apache.spark.sql.execution.datasources.WriteJobStatsTracker
import org.apache.spark.sql.execution.datasources.WriteTaskStats
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame

import java.net.URI

/**
* Delta implementation of DataWriter that applies rollup to compact the files.
*/
object DeltaRollupDataWriter extends RollupDataWriter with DeltaStatsCollectionUtils {

override type GetCubeMaxWeight = CubeId => Weight
override type Extract = InternalRow => (InternalRow, Weight, CubeId, CubeId)
override type WriteRows = Iterator[InternalRow] => Iterator[(IndexFile, TaskStats)]

override def write(
tableId: QTableID,
schema: StructType,
data: DataFrame,
tableChanges: TableChanges): IISeq[IndexFile] = {
val revision = tableChanges.updatedRevision
val dimensionCount = revision.transformations.length

val statsTrackers = StatsTracker.getStatsTrackers
val fileStatsTracker = getFileStatsTracker(tableId, data)
val trackers = statsTrackers ++ fileStatsTracker

val filesAndStats = internalWrite(tableId, schema, data, tableChanges, trackers)
val stats = filesAndStats.map(_._2)
processStats(stats, statsTrackers, fileStatsTracker)
filesAndStats
.map(_._1)
.map(QbeastFileUtils.toAddFile(dataChange = true))
.map(correctAddFileStats(fileStatsTracker))
.map(QbeastFileUtils.fromAddFile(dimensionCount))
}

private def getFileStatsTracker(
tableId: QTableID,
data: DataFrame): Option[DeltaJobStatisticsTracker] = {
val spark = data.sparkSession
val originalColumns = data.schema.map(_.name).filterNot(QbeastColumns.contains)
val originalData = data.selectExpr(originalColumns: _*)
getDeltaOptionalTrackers(originalData, spark, tableId)
}

private def processStats(
stats: IISeq[TaskStats],
statsTrackers: Seq[WriteJobStatsTracker],
fileStatsTracker: Option[DeltaJobStatisticsTracker]): Unit = {
val basicStatsBuilder = Seq.newBuilder[WriteTaskStats]
val fileStatsBuilder = Seq.newBuilder[WriteTaskStats]
var endTime = 0L
stats.foreach(stats => {
fileStatsBuilder ++= stats.writeTaskStats.filter(_.isInstanceOf[DeltaFileStatistics])
basicStatsBuilder ++= stats.writeTaskStats.filter(_.isInstanceOf[BasicWriteTaskStats])
endTime = math.max(endTime, stats.endTime)
})
val basicStats = basicStatsBuilder.result()
val fileStats = fileStatsBuilder.result()
statsTrackers.foreach(_.processStats(basicStats, endTime))
fileStatsTracker.foreach(_.processStats(fileStats, endTime))
}

private def correctAddFileStats(fileStatsTracker: Option[DeltaJobStatisticsTracker])(
file: AddFile): AddFile = {
val path = new Path(new URI(file.path)).toString
fileStatsTracker
.map(_.recordedStats(path))
.map(stats => file.copy(stats = stats))
.getOrElse(file)
}

}
9 changes: 7 additions & 2 deletions src/main/scala/io/qbeast/spark/delta/QbeastFileUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,15 @@ object QbeastFileUtils {
* an IndexFile instance
*/
def fromAddFile(dimensionCount: Int)(addFile: AddFile): IndexFile = {
val jsonString = addFile.stats
val stats = jsonString match {
case null => None
case _ => QbeastStatsUtils.fromString(jsonString)
}
val builder = new IndexFileBuilder()
.setPath(addFile.path)
.setSize(addFile.size)
.setStats(Some(addFile.stats))
.setStats(stats)
.setModificationTime(addFile.modificationTime)
addFile.getTag(TagUtils.revision) match {
case Some(value) => builder.setRevisionId(value.toLong)
Expand Down Expand Up @@ -88,7 +93,7 @@ object QbeastFileUtils {
TagUtils.revision -> indexFile.revisionId.toString,
TagUtils.blocks -> encodeBlocks(indexFile.blocks))
val stats = Option(indexFile.stats).flatMap {
case Some(s) => Some(s)
case Some(s) => Some(QbeastStatsUtils.toString(s))
case None => None
}.orNull
AddFile(
Expand Down
94 changes: 94 additions & 0 deletions src/main/scala/io/qbeast/spark/delta/QbeastStatsUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.spark.delta

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.JsonDeserializer
import com.fasterxml.jackson.databind.JsonMappingException
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.JsonSerializer
import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.module.scala.ClassTagExtensions
import io.qbeast.core.model.mapper
import io.qbeast.core.model.QbeastStats

object QbeastStatsUtils {
private val module = new SimpleModule()
module.addSerializer(classOf[String], new ValueSerializer)
module.addDeserializer(classOf[String], new ValueDeserializer)
mapper.registerModule(module)

def fromString(jsonString: String): Option[QbeastStats] = {
try {
Some(mapper.asInstanceOf[ClassTagExtensions].readValue[QbeastStats](jsonString))
} catch {
case e: JsonParseException =>
println(s"Failed to parse JSON: ${e.getMessage}")
None
case e: JsonMappingException =>
println(s"Error mapping JSON: ${e.getMessage}")
None
case e: Exception =>
println(s"An error occurred: ${e.getMessage}")
None
}
}

def toString(qbeastStats: QbeastStats): String = mapper.writeValueAsString(qbeastStats)

}

class ValueSerializer extends JsonSerializer[String] {

override def serialize(
value: String,
gen: JsonGenerator,
serializers: SerializerProvider): Unit = {
try {
val intValue = value.toInt
gen.writeNumber(intValue)
} catch {
case _: NumberFormatException =>
try {
val doubleValue = value.toDouble
gen.writeNumber(doubleValue)
} catch {
case _: NumberFormatException =>
gen.writeString(value)
}
}
}

}

class ValueDeserializer extends JsonDeserializer[String] {

override def deserialize(p: JsonParser, ct: DeserializationContext): String = {
val node = p.getCodec.readTree[JsonNode](p)
if (node.isNumber) {
node.asText()
} else if (node.isTextual) {
node.asText()
} else {
throw new IllegalArgumentException("Unsupported JSON type for value")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.spark.delta.writer
package io.qbeast.spark.writer

import io.qbeast.core.model.CubeId
import io.qbeast.core.model.Weight
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.spark.delta.writer
package io.qbeast.spark.writer

import io.qbeast.core.model.CubeId
import io.qbeast.core.model.IndexFileBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.spark.delta.writer
package io.qbeast.spark.writer

import io.qbeast.core.model.CubeId
import io.qbeast.core.model.IndexFile
Expand Down Expand Up @@ -85,9 +85,9 @@ private[writer] class IndexFileWriter(
val hadoopPath = new Path(output.path())
val status = hadoopPath.getFileSystem(config).getFileStatus(hadoopPath)
file
.setPath(hadoopPath.getName())
.setSize(status.getLen())
.setModificationTime(status.getModificationTime())
.setPath(hadoopPath.getName)
.setSize(status.getLen)
.setModificationTime(status.getModificationTime)
blocks.values.foreach(_.endBlock())
trackers.foreach(_.closeFile(output.path()))
val time = System.currentTimeMillis()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.spark.delta.writer
package io.qbeast.spark.writer

import io.qbeast.core.model.QTableID
import io.qbeast.core.model.RevisionID
Expand Down Expand Up @@ -61,7 +61,7 @@ private[writer] class IndexFileWriterFactory(
* a new IndexFileWriter instance
*/
def createIndexFileWriter(): IndexFileWriter = {
val path = new Path(tableId.id, s"${UUID.randomUUID()}.parquet").toString()
val path = new Path(tableId.id, s"${UUID.randomUUID()}.parquet").toString
val jobConfig = new JobConf(config.value)
val taskId = new TaskAttemptID("", 0, TaskType.REDUCE, 0, 0)
val context = new TaskAttemptContextImpl(jobConfig, taskId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.spark.delta.writer
package io.qbeast.spark.writer

import io.qbeast.core.model.CubeId

Expand Down Expand Up @@ -87,7 +87,7 @@ private[writer] class Rollup(limit: Double) {
size += other.size
}

override def toString(): String = s"[Group: cubeIds: ${cubeIds}, size: ${size}]"
override def toString: String = s"[Group: cubeIds: $cubeIds, size: $size]"

}

Expand Down
Loading

0 comments on commit 566b9cd

Please sign in to comment.