diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala index b9be8d5906..1ee27ffeaa 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala @@ -2,10 +2,8 @@ package filodb.coordinator.queryplanner import java.util.concurrent.ThreadLocalRandom - import akka.serialization.SerializationExtension import com.typesafe.scalalogging.StrictLogging - import filodb.coordinator.{ActorPlanDispatcher, ActorSystemHolder, GrpcPlanDispatcher, RemoteActorPlanDispatcher} import filodb.core.metadata.{Dataset, DatasetOptions, Schemas} import filodb.core.query._ @@ -233,11 +231,21 @@ trait DefaultPlanner { lp: ApplyMiscellaneousFunction, forceInProcess: Boolean = false): PlanResult = { val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess) - if (lp.function == MiscellaneousFunctionId.HistToPromVectors) - vectors.plans.foreach(_.addRangeVectorTransformer(HistToPromSeriesMapper(schemas.part))) - else - vectors.plans.foreach(_.addRangeVectorTransformer(MiscellaneousFunctionMapper(lp.function, lp.stringArgs))) - vectors + if (lp.function == MiscellaneousFunctionId.OptimizeWithAgg) + { + // The `Optimize` function is a no-operation (no-op), meaning it does not perform any transformation or + // computation. However, it is necessary to pass it through to the execution plan without any modifications when + // dealing with an "Optimize with Aggregation" query. This ensures that the optimization logic is preserved + // and applied correctly during the aggregation phase, without interfering with the underlying data + // or query execution flow. + vectors + }else{ + if (lp.function == MiscellaneousFunctionId.HistToPromVectors) + vectors.plans.foreach(_.addRangeVectorTransformer(HistToPromSeriesMapper(schemas.part))) + else + vectors.plans.foreach(_.addRangeVectorTransformer(MiscellaneousFunctionMapper(lp.function, lp.stringArgs))) + vectors + } } def materializeApplyInstantFunctionRaw(qContext: QueryContext, diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala index 7d011421bc..5ad1b33096 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LogicalPlanParserSpec.scala @@ -5,8 +5,9 @@ import org.scalatest.matchers.should.Matchers import filodb.prometheus.ast.TimeStepParams import filodb.prometheus.parse.Parser import filodb.query.LogicalPlan.getColumnFilterGroup +import filodb.query.MiscellaneousFunctionId.OptimizeWithAgg import filodb.query.util.{ExcludeAggRule, HierarchicalQueryExperienceParams, IncludeAggRule} -import filodb.query.{Aggregate, BinaryJoin, IntervalSelector, RawSeries, SeriesKeysByFilters} +import filodb.query.{Aggregate, ApplyMiscellaneousFunction, BinaryJoin, IntervalSelector, RawSeries, SeriesKeysByFilters} class LogicalPlanParserSpec extends AnyFunSpec with Matchers { @@ -1037,4 +1038,31 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers { updatedMetricNamesSet.contains("my_gauge:::no_rule").shouldEqual(true) // not updated updatedMetricNamesSet.contains("your_gauge:::no_rule2").shouldEqual(true) // not updated } + + it("should correctly apply optimize_with_agg function to a Prometheus query") { + val timeParamsSec1 = TimeStepParams(1000, 10, 10000) + val query1 = """optimize_with_agg(sum(rate(mns_gmail_authenticate_request_ms{_ws_="acs-icloud",_ns_="mail-notifications",app="mail-notifications",env="prod",_type_="prom-histogram"}[5m])))""" + val lp1 = Parser.queryRangeToLogicalPlan(query1, timeParamsSec1) + lp1.isInstanceOf[ApplyMiscellaneousFunction].shouldEqual(true) + lp1.asInstanceOf[ApplyMiscellaneousFunction].function .shouldEqual(OptimizeWithAgg) + LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp1) shouldEqual "ApplyMiscellaneousFunction(Aggregate(PeriodicSeriesWithWindowing(RawSeries)))" + + + val timeParamsSec2 = TimeStepParams(1000, 10, 10000) + val query2 = """optimize_with_agg(sum(my_gauge:::suffix1{includeTag1="spark", includeTag2="filodb"}) by (includeTag3, includeTag1) + sum(your_gauge:::suffix2{notExcludeTag1="spark", notExcludeTag2="filodb"}) without (excludeTag1, excludeTag2))""" + val lp2 = Parser.queryRangeToLogicalPlan(query2, timeParamsSec2) + lp2.isInstanceOf[ApplyMiscellaneousFunction].shouldEqual(true) + lp2.asInstanceOf[ApplyMiscellaneousFunction].function .shouldEqual(OptimizeWithAgg) + var query = LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp2) + LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp2) shouldEqual "ApplyMiscellaneousFunction(BinaryJoin(Aggregate(PeriodicSeries(RawSeries)),Aggregate(PeriodicSeries(RawSeries))))" + + query = "" + val timeParamsSec3 = TimeStepParams(1000, 10, 10000) + val query3 = """optimize_with_agg(sum(my_gauge:::no_rule{includeTag1="spark", includeTag2="filodb"}) by (includeTag3, includeTag1) + sum(your_gauge:::suffix2{notExcludeTag1="spark", notExcludeTag2="filodb"}) by (notExcludeTag1))""" + val lp3 = Parser.queryRangeToLogicalPlan(query3, timeParamsSec3) + lp3.isInstanceOf[ApplyMiscellaneousFunction].shouldEqual(true) + lp3.asInstanceOf[ApplyMiscellaneousFunction].function .shouldEqual(OptimizeWithAgg) + var query4 = LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp3) + LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp3) shouldEqual "ApplyMiscellaneousFunction(BinaryJoin(Aggregate(PeriodicSeries(RawSeries)),Aggregate(PeriodicSeries(RawSeries))))" + } }