From 990dbf03117791d2400b199334df47f0523a3b4d Mon Sep 17 00:00:00 2001 From: MichaelFerence Date: Sat, 4 Jan 2025 02:51:23 -0500 Subject: [PATCH] feat(query): Adding support of optimize_with_agg logical plan to make use of aggregated metrics in HQE (#1923) * Adding support for optmize-with-agg logical plan Co-authored-by: mference Co-authored-by: sandeep6189 --- .../queryplanner/DefaultPlanner.scala | 18 +++++++---- .../filodb/coordinator/ProtoConverters.scala | 4 +++ .../queryplanner/LogicalPlanParserSpec.scala | 30 ++++++++++++++++++- grpc/src/main/protobuf/query_service.proto | 1 + .../main/scala/filodb/query/PlanEnums.scala | 1 + 5 files changed, 47 insertions(+), 7 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala index 48e161b047..0e459fa657 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala @@ -18,7 +18,7 @@ import filodb.query.LogicalPlan._ import filodb.query.exec._ import filodb.query.exec.InternalRangeFunction.Last - +//scalastyle:off file.size.limit /** * Intermediate Plan Result includes the exec plan(s) along with any state to be passed up the * plan building call tree during query planning. @@ -234,11 +234,17 @@ trait DefaultPlanner { lp: ApplyMiscellaneousFunction, forceInProcess: Boolean = false): PlanResult = { val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess) - if (lp.function == MiscellaneousFunctionId.HistToPromVectors) - vectors.plans.foreach(_.addRangeVectorTransformer(HistToPromSeriesMapper(schemas.part))) - else - vectors.plans.foreach(_.addRangeVectorTransformer(MiscellaneousFunctionMapper(lp.function, lp.stringArgs))) - vectors + if (lp.function == MiscellaneousFunctionId.OptimizeWithAgg) { + // Optimize with aggregation is a no-op, doing no transformation. It must pass through + // the execution plan to apply optimization logic correctly during aggregation. + vectors + } else { + if (lp.function == MiscellaneousFunctionId.HistToPromVectors) + vectors.plans.foreach(_.addRangeVectorTransformer(HistToPromSeriesMapper(schemas.part))) + else + vectors.plans.foreach(_.addRangeVectorTransformer(MiscellaneousFunctionMapper(lp.function, lp.stringArgs))) + vectors + } } def materializeApplyInstantFunctionRaw(qContext: QueryContext, diff --git a/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala b/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala index 5fee8cab4d..6334cc6839 100644 --- a/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala +++ b/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala @@ -1075,6 +1075,8 @@ object ProtoConverters { case filodb.query.MiscellaneousFunctionId.LabelJoin => GrpcMultiPartitionQueryService.MiscellaneousFunctionId.LABEL_JOIN case filodb.query.MiscellaneousFunctionId.HistToPromVectors => GrpcMultiPartitionQueryService.MiscellaneousFunctionId.HIST_TO_PROM_VECTORS + case filodb.query.MiscellaneousFunctionId.OptimizeWithAgg => + GrpcMultiPartitionQueryService.MiscellaneousFunctionId.OPTIMIZE_WITH_AGG } function } @@ -1087,6 +1089,8 @@ object ProtoConverters { case GrpcMultiPartitionQueryService.MiscellaneousFunctionId.LABEL_JOIN => filodb.query.MiscellaneousFunctionId.LabelJoin case GrpcMultiPartitionQueryService.MiscellaneousFunctionId.HIST_TO_PROM_VECTORS => filodb.query.MiscellaneousFunctionId.HistToPromVectors + case GrpcMultiPartitionQueryService.MiscellaneousFunctionId.OPTIMIZE_WITH_AGG => + filodb.query.MiscellaneousFunctionId.OptimizeWithAgg case GrpcMultiPartitionQueryService.MiscellaneousFunctionId.UNRECOGNIZED => throw new IllegalArgumentException(s"Unrecognized MiscellaneousFunctionId ${f}") } diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala index 7d011421bc..5ad1b33096 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala @@ -5,8 +5,9 @@ import org.scalatest.matchers.should.Matchers import filodb.prometheus.ast.TimeStepParams import filodb.prometheus.parse.Parser import filodb.query.LogicalPlan.getColumnFilterGroup +import filodb.query.MiscellaneousFunctionId.OptimizeWithAgg import filodb.query.util.{ExcludeAggRule, HierarchicalQueryExperienceParams, IncludeAggRule} -import filodb.query.{Aggregate, BinaryJoin, IntervalSelector, RawSeries, SeriesKeysByFilters} +import filodb.query.{Aggregate, ApplyMiscellaneousFunction, BinaryJoin, IntervalSelector, RawSeries, SeriesKeysByFilters} class LogicalPlanParserSpec extends AnyFunSpec with Matchers { @@ -1037,4 +1038,31 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { updatedMetricNamesSet.contains("my_gauge:::no_rule").shouldEqual(true) // not updated updatedMetricNamesSet.contains("your_gauge:::no_rule2").shouldEqual(true) // not updated } + + it("should correctly apply optimize_with_agg function to a Prometheus query") { + val timeParamsSec1 = TimeStepParams(1000, 10, 10000) + val query1 = """optimize_with_agg(sum(rate(mns_gmail_authenticate_request_ms{_ws_="acs-icloud",_ns_="mail-notifications",app="mail-notifications",env="prod",_type_="prom-histogram"}[5m])))""" + val lp1 = Parser.queryRangeToLogicalPlan(query1, timeParamsSec1) + lp1.isInstanceOf[ApplyMiscellaneousFunction].shouldEqual(true) + lp1.asInstanceOf[ApplyMiscellaneousFunction].function .shouldEqual(OptimizeWithAgg) + LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp1) shouldEqual "ApplyMiscellaneousFunction(Aggregate(PeriodicSeriesWithWindowing(RawSeries)))" + + + val timeParamsSec2 = TimeStepParams(1000, 10, 10000) + val query2 = """optimize_with_agg(sum(my_gauge:::suffix1{includeTag1="spark", includeTag2="filodb"}) by (includeTag3, includeTag1) + sum(your_gauge:::suffix2{notExcludeTag1="spark", notExcludeTag2="filodb"}) without (excludeTag1, excludeTag2))""" + val lp2 = Parser.queryRangeToLogicalPlan(query2, timeParamsSec2) + lp2.isInstanceOf[ApplyMiscellaneousFunction].shouldEqual(true) + lp2.asInstanceOf[ApplyMiscellaneousFunction].function .shouldEqual(OptimizeWithAgg) + var query = LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp2) + LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp2) shouldEqual "ApplyMiscellaneousFunction(BinaryJoin(Aggregate(PeriodicSeries(RawSeries)),Aggregate(PeriodicSeries(RawSeries))))" + + query = "" + val timeParamsSec3 = TimeStepParams(1000, 10, 10000) + val query3 = """optimize_with_agg(sum(my_gauge:::no_rule{includeTag1="spark", includeTag2="filodb"}) by (includeTag3, includeTag1) + sum(your_gauge:::suffix2{notExcludeTag1="spark", notExcludeTag2="filodb"}) by (notExcludeTag1))""" + val lp3 = Parser.queryRangeToLogicalPlan(query3, timeParamsSec3) + lp3.isInstanceOf[ApplyMiscellaneousFunction].shouldEqual(true) + lp3.asInstanceOf[ApplyMiscellaneousFunction].function .shouldEqual(OptimizeWithAgg) + var query4 = LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp3) + LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp3) shouldEqual "ApplyMiscellaneousFunction(BinaryJoin(Aggregate(PeriodicSeries(RawSeries)),Aggregate(PeriodicSeries(RawSeries))))" + } } diff --git a/grpc/src/main/protobuf/query_service.proto b/grpc/src/main/protobuf/query_service.proto index 3af700bcfa..6f779b5092 100644 --- a/grpc/src/main/protobuf/query_service.proto +++ b/grpc/src/main/protobuf/query_service.proto @@ -653,6 +653,7 @@ enum MiscellaneousFunctionId { LABEL_REPLACE = 0; LABEL_JOIN = 1; HIST_TO_PROM_VECTORS = 2; + OPTIMIZE_WITH_AGG = 3; } enum BinaryOperator { diff --git a/query/src/main/scala/filodb/query/PlanEnums.scala b/query/src/main/scala/filodb/query/PlanEnums.scala index 9333761b0b..c8fc1989f4 100644 --- a/query/src/main/scala/filodb/query/PlanEnums.scala +++ b/query/src/main/scala/filodb/query/PlanEnums.scala @@ -177,6 +177,7 @@ object MiscellaneousFunctionId extends Enum[MiscellaneousFunctionId] { case object LabelReplace extends MiscellaneousFunctionId("label_replace") case object LabelJoin extends MiscellaneousFunctionId("label_join") case object HistToPromVectors extends MiscellaneousFunctionId("hist_to_prom_vectors") + case object OptimizeWithAgg extends MiscellaneousFunctionId("optimize_with_agg") } sealed abstract class SortFunctionId(override val entryName: String) extends EnumEntry