From 8a5853a6efcffe10b5404e67f67c95b2bf545e9c Mon Sep 17 00:00:00 2001 From: mference Date: Thu, 2 Jan 2025 09:48:21 -0500 Subject: [PATCH 1/7] Adding support for optmize-with-agg --- .../main/scala/filodb/coordinator/ProtoConverters.scala | 4 ++++ grpc/src/main/protobuf/query_service.proto | 1 + .../src/main/scala/filodb/prometheus/ast/Functions.scala | 7 ++++++- query/src/main/scala/filodb/query/PlanEnums.scala | 1 + 4 files changed, 12 insertions(+), 1 deletion(-) diff --git a/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala b/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala index d182e01778..0a3767fc4e 100644 --- a/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala +++ b/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala @@ -1065,6 +1065,8 @@ object ProtoConverters { case filodb.query.MiscellaneousFunctionId.LabelJoin => GrpcMultiPartitionQueryService.MiscellaneousFunctionId.LABEL_JOIN case filodb.query.MiscellaneousFunctionId.HistToPromVectors => GrpcMultiPartitionQueryService.MiscellaneousFunctionId.HIST_TO_PROM_VECTORS + case filodb.query.MiscellaneousFunctionId.OptimizeWithAgg => + GrpcMultiPartitionQueryService.MiscellaneousFunctionId.OPTIMIZE_WITH_AGG } function } @@ -1077,6 +1079,8 @@ object ProtoConverters { case GrpcMultiPartitionQueryService.MiscellaneousFunctionId.LABEL_JOIN => filodb.query.MiscellaneousFunctionId.LabelJoin case GrpcMultiPartitionQueryService.MiscellaneousFunctionId.HIST_TO_PROM_VECTORS => filodb.query.MiscellaneousFunctionId.HistToPromVectors + case GrpcMultiPartitionQueryService.MiscellaneousFunctionId.OPTIMIZE_WITH_AGG => + filodb.query.MiscellaneousFunctionId.OptimizeWithAgg case GrpcMultiPartitionQueryService.MiscellaneousFunctionId.UNRECOGNIZED => throw new IllegalArgumentException(s"Unrecognized MiscellaneousFunctionId ${f}") } diff --git a/grpc/src/main/protobuf/query_service.proto b/grpc/src/main/protobuf/query_service.proto index 499c887f4d..bce1197635 100644 --- a/grpc/src/main/protobuf/query_service.proto +++ b/grpc/src/main/protobuf/query_service.proto @@ -653,6 +653,7 @@ enum MiscellaneousFunctionId { LABEL_REPLACE = 0; LABEL_JOIN = 1; HIST_TO_PROM_VECTORS = 2; + OPTIMIZE_WITH_AGG = 3; } enum BinaryOperator { diff --git a/prometheus/src/main/scala/filodb/prometheus/ast/Functions.scala b/prometheus/src/main/scala/filodb/prometheus/ast/Functions.scala index 4b6a9ec74c..3f8009b23e 100644 --- a/prometheus/src/main/scala/filodb/prometheus/ast/Functions.scala +++ b/prometheus/src/main/scala/filodb/prometheus/ast/Functions.scala @@ -96,6 +96,7 @@ case class Function(name: String, allParams: Seq[Expression]) extends Expression val instantFunctionIdOpt = InstantFunctionId.withNameInsensitiveOption(name) val filoFunctionIdOpt = FiloFunctionId.withNameInsensitiveOption(name) val scalarFunctionIdOpt = ScalarFunctionId.withNameInsensitiveOption(name) + val miscellaneousFunctionId = MiscellaneousFunctionId.withNameInsensitiveOption(name); if (vectorFn.isDefined) { allParams.head match { case num: ScalarExpression => val params = RangeParams(timeParams.start, timeParams.step, timeParams.end) @@ -138,7 +139,11 @@ case class Function(name: String, allParams: Seq[Expression]) extends Expression case FiloFunctionId.ChunkMetaAll => // Just get the raw chunk metadata RawChunkMeta(rangeSelector, filters, column.getOrElse("")) } - } else toSeriesPlanMisc(seriesParam, otherParams, timeParams) + } else if(miscellaneousFunctionId.isDefined) { // + toSeriesPlanMisc(seriesParam, otherParams, timeParams) // need to update this logic + } + else toSeriesPlanMisc(seriesParam, otherParams, timeParams) + } } } diff --git a/query/src/main/scala/filodb/query/PlanEnums.scala b/query/src/main/scala/filodb/query/PlanEnums.scala index 9333761b0b..c8fc1989f4 100644 --- a/query/src/main/scala/filodb/query/PlanEnums.scala +++ b/query/src/main/scala/filodb/query/PlanEnums.scala @@ -177,6 +177,7 @@ object MiscellaneousFunctionId extends Enum[MiscellaneousFunctionId] { case object LabelReplace extends MiscellaneousFunctionId("label_replace") case object LabelJoin extends MiscellaneousFunctionId("label_join") case object HistToPromVectors extends MiscellaneousFunctionId("hist_to_prom_vectors") + case object OptimizeWithAgg extends MiscellaneousFunctionId("optimize_with_agg") } sealed abstract class SortFunctionId(override val entryName: String) extends EnumEntry From 73471114fad9595939ed63a00fda61029cc26d2a Mon Sep 17 00:00:00 2001 From: mference Date: Thu, 2 Jan 2025 10:34:47 -0500 Subject: [PATCH 2/7] Removed extra checks --- .../src/main/scala/filodb/prometheus/ast/Functions.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/prometheus/src/main/scala/filodb/prometheus/ast/Functions.scala b/prometheus/src/main/scala/filodb/prometheus/ast/Functions.scala index 3f8009b23e..4b6a9ec74c 100644 --- a/prometheus/src/main/scala/filodb/prometheus/ast/Functions.scala +++ b/prometheus/src/main/scala/filodb/prometheus/ast/Functions.scala @@ -96,7 +96,6 @@ case class Function(name: String, allParams: Seq[Expression]) extends Expression val instantFunctionIdOpt = InstantFunctionId.withNameInsensitiveOption(name) val filoFunctionIdOpt = FiloFunctionId.withNameInsensitiveOption(name) val scalarFunctionIdOpt = ScalarFunctionId.withNameInsensitiveOption(name) - val miscellaneousFunctionId = MiscellaneousFunctionId.withNameInsensitiveOption(name); if (vectorFn.isDefined) { allParams.head match { case num: ScalarExpression => val params = RangeParams(timeParams.start, timeParams.step, timeParams.end) @@ -139,11 +138,7 @@ case class Function(name: String, allParams: Seq[Expression]) extends Expression case FiloFunctionId.ChunkMetaAll => // Just get the raw chunk metadata RawChunkMeta(rangeSelector, filters, column.getOrElse("")) } - } else if(miscellaneousFunctionId.isDefined) { // - toSeriesPlanMisc(seriesParam, otherParams, timeParams) // need to update this logic - } - else toSeriesPlanMisc(seriesParam, otherParams, timeParams) - } + } else toSeriesPlanMisc(seriesParam, otherParams, timeParams) } } From 75c8e3b2be4468b32d737bf0c072a4e1a5acfb49 Mon Sep 17 00:00:00 2001 From: mference Date: Fri, 3 Jan 2025 12:15:19 -0500 Subject: [PATCH 3/7] Added test cases and ignore materialize for no-op --- .../queryplanner/DefaultPlanner.scala | 22 +++++++++----- .../queryplanner/LogicalPlanParserSpec.scala | 30 ++++++++++++++++++- 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala index b9be8d5906..1ee27ffeaa 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala @@ -2,10 +2,8 @@ package filodb.coordinator.queryplanner import java.util.concurrent.ThreadLocalRandom - import akka.serialization.SerializationExtension import com.typesafe.scalalogging.StrictLogging - import filodb.coordinator.{ActorPlanDispatcher, ActorSystemHolder, GrpcPlanDispatcher, RemoteActorPlanDispatcher} import filodb.core.metadata.{Dataset, DatasetOptions, Schemas} import filodb.core.query._ @@ -233,11 +231,21 @@ trait DefaultPlanner { lp: ApplyMiscellaneousFunction, forceInProcess: Boolean = false): PlanResult = { val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess) - if (lp.function == MiscellaneousFunctionId.HistToPromVectors) - vectors.plans.foreach(_.addRangeVectorTransformer(HistToPromSeriesMapper(schemas.part))) - else - vectors.plans.foreach(_.addRangeVectorTransformer(MiscellaneousFunctionMapper(lp.function, lp.stringArgs))) - vectors + if (lp.function == MiscellaneousFunctionId.OptimizeWithAgg) + { + // The `Optimize` function is a no-operation (no-op), meaning it does not perform any transformation or + // computation. However, it is necessary to pass it through to the execution plan without any modifications when + // dealing with an "Optimize with Aggregation" query. This ensures that the optimization logic is preserved + // and applied correctly during the aggregation phase, without interfering with the underlying data + // or query execution flow. + vectors + }else{ + if (lp.function == MiscellaneousFunctionId.HistToPromVectors) + vectors.plans.foreach(_.addRangeVectorTransformer(HistToPromSeriesMapper(schemas.part))) + else + vectors.plans.foreach(_.addRangeVectorTransformer(MiscellaneousFunctionMapper(lp.function, lp.stringArgs))) + vectors + } } def materializeApplyInstantFunctionRaw(qContext: QueryContext, diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala index 7d011421bc..5ad1b33096 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala @@ -5,8 +5,9 @@ import org.scalatest.matchers.should.Matchers import filodb.prometheus.ast.TimeStepParams import filodb.prometheus.parse.Parser import filodb.query.LogicalPlan.getColumnFilterGroup +import filodb.query.MiscellaneousFunctionId.OptimizeWithAgg import filodb.query.util.{ExcludeAggRule, HierarchicalQueryExperienceParams, IncludeAggRule} -import filodb.query.{Aggregate, BinaryJoin, IntervalSelector, RawSeries, SeriesKeysByFilters} +import filodb.query.{Aggregate, ApplyMiscellaneousFunction, BinaryJoin, IntervalSelector, RawSeries, SeriesKeysByFilters} class LogicalPlanParserSpec extends AnyFunSpec with Matchers { @@ -1037,4 +1038,31 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { updatedMetricNamesSet.contains("my_gauge:::no_rule").shouldEqual(true) // not updated updatedMetricNamesSet.contains("your_gauge:::no_rule2").shouldEqual(true) // not updated } + + it("should correctly apply optimize_with_agg function to a Prometheus query") { + val timeParamsSec1 = TimeStepParams(1000, 10, 10000) + val query1 = """optimize_with_agg(sum(rate(mns_gmail_authenticate_request_ms{_ws_="acs-icloud",_ns_="mail-notifications",app="mail-notifications",env="prod",_type_="prom-histogram"}[5m])))""" + val lp1 = Parser.queryRangeToLogicalPlan(query1, timeParamsSec1) + lp1.isInstanceOf[ApplyMiscellaneousFunction].shouldEqual(true) + lp1.asInstanceOf[ApplyMiscellaneousFunction].function .shouldEqual(OptimizeWithAgg) + LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp1) shouldEqual "ApplyMiscellaneousFunction(Aggregate(PeriodicSeriesWithWindowing(RawSeries)))" + + + val timeParamsSec2 = TimeStepParams(1000, 10, 10000) + val query2 = """optimize_with_agg(sum(my_gauge:::suffix1{includeTag1="spark", includeTag2="filodb"}) by (includeTag3, includeTag1) + sum(your_gauge:::suffix2{notExcludeTag1="spark", notExcludeTag2="filodb"}) without (excludeTag1, excludeTag2))""" + val lp2 = Parser.queryRangeToLogicalPlan(query2, timeParamsSec2) + lp2.isInstanceOf[ApplyMiscellaneousFunction].shouldEqual(true) + lp2.asInstanceOf[ApplyMiscellaneousFunction].function .shouldEqual(OptimizeWithAgg) + var query = LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp2) + LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp2) shouldEqual "ApplyMiscellaneousFunction(BinaryJoin(Aggregate(PeriodicSeries(RawSeries)),Aggregate(PeriodicSeries(RawSeries))))" + + query = "" + val timeParamsSec3 = TimeStepParams(1000, 10, 10000) + val query3 = """optimize_with_agg(sum(my_gauge:::no_rule{includeTag1="spark", includeTag2="filodb"}) by (includeTag3, includeTag1) + sum(your_gauge:::suffix2{notExcludeTag1="spark", notExcludeTag2="filodb"}) by (notExcludeTag1))""" + val lp3 = Parser.queryRangeToLogicalPlan(query3, timeParamsSec3) + lp3.isInstanceOf[ApplyMiscellaneousFunction].shouldEqual(true) + lp3.asInstanceOf[ApplyMiscellaneousFunction].function .shouldEqual(OptimizeWithAgg) + var query4 = LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp3) + LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp3) shouldEqual "ApplyMiscellaneousFunction(BinaryJoin(Aggregate(PeriodicSeries(RawSeries)),Aggregate(PeriodicSeries(RawSeries))))" + } } From c597194c7c83db84b75638cb5a24185457e9f888 Mon Sep 17 00:00:00 2001 From: mference Date: Fri, 3 Jan 2025 13:16:01 -0500 Subject: [PATCH 4/7] Improved code readability --- .../scala/filodb.coordinator/queryplanner/DefaultPlanner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala index 1ee27ffeaa..87a602087e 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala @@ -239,7 +239,7 @@ trait DefaultPlanner { // and applied correctly during the aggregation phase, without interfering with the underlying data // or query execution flow. vectors - }else{ + } else { if (lp.function == MiscellaneousFunctionId.HistToPromVectors) vectors.plans.foreach(_.addRangeVectorTransformer(HistToPromSeriesMapper(schemas.part))) else From 8f0bdb01516020afefd7cbd42c308ae500880496 Mon Sep 17 00:00:00 2001 From: mference Date: Fri, 3 Jan 2025 14:52:04 -0500 Subject: [PATCH 5/7] Fixed linting --- .../queryplanner/DefaultPlanner.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala index 87a602087e..fc4b96d14f 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala @@ -2,8 +2,10 @@ package filodb.coordinator.queryplanner import java.util.concurrent.ThreadLocalRandom + import akka.serialization.SerializationExtension import com.typesafe.scalalogging.StrictLogging + import filodb.coordinator.{ActorPlanDispatcher, ActorSystemHolder, GrpcPlanDispatcher, RemoteActorPlanDispatcher} import filodb.core.metadata.{Dataset, DatasetOptions, Schemas} import filodb.core.query._ @@ -231,13 +233,9 @@ trait DefaultPlanner { lp: ApplyMiscellaneousFunction, forceInProcess: Boolean = false): PlanResult = { val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess) - if (lp.function == MiscellaneousFunctionId.OptimizeWithAgg) - { - // The `Optimize` function is a no-operation (no-op), meaning it does not perform any transformation or - // computation. However, it is necessary to pass it through to the execution plan without any modifications when - // dealing with an "Optimize with Aggregation" query. This ensures that the optimization logic is preserved - // and applied correctly during the aggregation phase, without interfering with the underlying data - // or query execution flow. + if (lp.function == MiscellaneousFunctionId.OptimizeWithAgg) { + // Optimize` is a no-op that preserves optimization logic in + // aggregation queries, ensuring correct execution without modifying underlying data. vectors } else { if (lp.function == MiscellaneousFunctionId.HistToPromVectors) From b8a38288e3359d8b14122707d2de57476f9876f1 Mon Sep 17 00:00:00 2001 From: mference Date: Fri, 3 Jan 2025 15:04:54 -0500 Subject: [PATCH 6/7] Fixed linting --- .../filodb.coordinator/queryplanner/DefaultPlanner.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala index fc4b96d14f..88ae4167fe 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala @@ -234,8 +234,8 @@ trait DefaultPlanner { forceInProcess: Boolean = false): PlanResult = { val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess) if (lp.function == MiscellaneousFunctionId.OptimizeWithAgg) { - // Optimize` is a no-op that preserves optimization logic in - // aggregation queries, ensuring correct execution without modifying underlying data. + // Optimize with aggregation is a no-op, doing no transformation. It must pass through + // the execution plan to apply optimization logic correctly during aggregation. vectors } else { if (lp.function == MiscellaneousFunctionId.HistToPromVectors) From a86b99c40f99114c2acfe4be0a2949df3f7562fb Mon Sep 17 00:00:00 2001 From: sandeep6189 Date: Sat, 4 Jan 2025 11:15:43 +0530 Subject: [PATCH 7/7] turning of checkstyle for file length --- .../scala/filodb.coordinator/queryplanner/DefaultPlanner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala index 88ae4167fe..e52f999f83 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala @@ -18,7 +18,7 @@ import filodb.query.LogicalPlan._ import filodb.query.exec._ import filodb.query.exec.InternalRangeFunction.Last - +//scalastyle:off file.size.limit /** * Intermediate Plan Result includes the exec plan(s) along with any state to be passed up the * plan building call tree during query planning.