From c223a0e2066ecb5afe803244c6650ef8a0da0bca Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 21 Jan 2025 10:40:30 +0100 Subject: [PATCH] Move bounding to the Transformation --- .../core/transform/LinearTransformation.scala | 6 ++++++ .../core/transform/LinearTransformer.scala | 2 -- .../qbeast/core/transform/Transformation.scala | 9 +++++++++ .../io/qbeast/core/transform/Transformer.scala | 9 --------- .../qbeast/spark/index/OTreeDataAnalyzer.scala | 18 +++++++++++------- 5 files changed, 26 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/io/qbeast/core/transform/LinearTransformation.scala b/core/src/main/scala/io/qbeast/core/transform/LinearTransformation.scala index 36ce4152e..c879fee97 100644 --- a/core/src/main/scala/io/qbeast/core/transform/LinearTransformation.scala +++ b/core/src/main/scala/io/qbeast/core/transform/LinearTransformation.scala @@ -75,6 +75,12 @@ case class LinearTransformation( 1.0 / (mx - mn) } + /** + * The LinearTransformation is bounded by min and max values + * @return + */ + override def bounded: Boolean = true + override def transform(value: Any): Double = { val v = if (value == null) nullValue else value v match { diff --git a/core/src/main/scala/io/qbeast/core/transform/LinearTransformer.scala b/core/src/main/scala/io/qbeast/core/transform/LinearTransformer.scala index a2ac5c583..87524a0c0 100644 --- a/core/src/main/scala/io/qbeast/core/transform/LinearTransformer.scala +++ b/core/src/main/scala/io/qbeast/core/transform/LinearTransformer.scala @@ -50,8 +50,6 @@ case class LinearTransformer(columnName: String, dataType: QDataType) extends Tr } } - override def isBounded: Boolean = true - override def stats: ColumnStats = ColumnStats( names = Seq(colMin, colMax), diff --git a/core/src/main/scala/io/qbeast/core/transform/Transformation.scala b/core/src/main/scala/io/qbeast/core/transform/Transformation.scala index 028e408c1..add333c36 100644 --- a/core/src/main/scala/io/qbeast/core/transform/Transformation.scala +++ b/core/src/main/scala/io/qbeast/core/transform/Transformation.scala @@ -30,6 +30,15 @@ import java.time.Instant property = "className") trait Transformation extends Serializable { + /** + * Returns if the transformation is bounded within the space f.e: LinearTransformer is bounded + * with min and max (values outside the ranges can cause Indexing Errors), HashTransformer is + * not, because the transformation function is a Hash with a fixed range + * + * @return + */ + def bounded: Boolean = false + /** * Normalize an input value to a Double between 0 and 1. * @param value diff --git a/core/src/main/scala/io/qbeast/core/transform/Transformer.scala b/core/src/main/scala/io/qbeast/core/transform/Transformer.scala index 136b45b51..7d004971f 100644 --- a/core/src/main/scala/io/qbeast/core/transform/Transformer.scala +++ b/core/src/main/scala/io/qbeast/core/transform/Transformer.scala @@ -156,15 +156,6 @@ trait Transformer extends Serializable { def spec: String = s"$columnName:${transformerType.transformerSimpleName}" - /** - * Returns if the transformation is bounded within the space - * f.e: - * LinearTransformer is bounded with min and max (values outside the ranges can cause Indexing Errors), - * HashTransformer is not, because the transformation function is a Hash with a fixed range - * @return - */ - def isBounded: Boolean = false - } trait ColumnStats extends Serializable { diff --git a/core/src/main/scala/io/qbeast/spark/index/OTreeDataAnalyzer.scala b/core/src/main/scala/io/qbeast/spark/index/OTreeDataAnalyzer.scala index 912dfeb4e..369041d3b 100644 --- a/core/src/main/scala/io/qbeast/spark/index/OTreeDataAnalyzer.scala +++ b/core/src/main/scala/io/qbeast/spark/index/OTreeDataAnalyzer.scala @@ -232,11 +232,16 @@ object DoublePassOTreeDataAnalyzer dataFrame: DataFrame, revision: Revision): Unit = { // Check if the DataFrame and the Columns To Index are deterministic - val columnsToAnalyze = - revision.columnTransformers.filter(_.isBounded).map(_.columnName) - if (columnsToAnalyze.nonEmpty) { - logDebug(s"Some columnsToIndex need to come from a Deterministic Source: {${columnsToAnalyze - .mkString(",")}}. Checking the determinism of the input data") + val boundedColumnTransformations = revision.columnTransformers + .zip(revision.transformations) + .collect { + case (columnTransformer, transformation) if transformation.bounded => + columnTransformer.columnName + } + if (boundedColumnTransformations.nonEmpty) { + logDebug( + s"Some columns to index need to come from a Deterministic Source: {${boundedColumnTransformations + .mkString(",")}}. Checking the determinism of the input data") // Detect if the DataFrame's operations are deterministic val isPlanDeterministic: Boolean = isDataFramePlanDeterministic(dataFrame) @@ -252,8 +257,7 @@ object DoublePassOTreeDataAnalyzer s"3. save the DF as delta and Convert it To Qbeast in a second step") } } else { - logDebug( - s"No columnsToIndex need to come from a Deterministic Source. Skipping determinism check.") + logDebug(s"No bounded columns to index. Skipping determinism check.") } }