Skip to content

Commit

Permalink
feat(query): Adding support of optimize_with_agg logical plan to make…
Browse files Browse the repository at this point in the history
… use of aggregated metrics in HQE (#1923)

* Adding support for optmize-with-agg logical plan

Co-authored-by: mference <mference@apple.com>
Co-authored-by: sandeep6189 <sandeep6189@gmail.com>
  • Loading branch information
3 people authored Jan 4, 2025
1 parent ae90d49 commit 990dbf0
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import filodb.query.LogicalPlan._
import filodb.query.exec._
import filodb.query.exec.InternalRangeFunction.Last


//scalastyle:off file.size.limit
/**
* Intermediate Plan Result includes the exec plan(s) along with any state to be passed up the
* plan building call tree during query planning.
Expand Down Expand Up @@ -234,11 +234,17 @@ trait DefaultPlanner {
lp: ApplyMiscellaneousFunction,
forceInProcess: Boolean = false): PlanResult = {
val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess)
if (lp.function == MiscellaneousFunctionId.HistToPromVectors)
vectors.plans.foreach(_.addRangeVectorTransformer(HistToPromSeriesMapper(schemas.part)))
else
vectors.plans.foreach(_.addRangeVectorTransformer(MiscellaneousFunctionMapper(lp.function, lp.stringArgs)))
vectors
if (lp.function == MiscellaneousFunctionId.OptimizeWithAgg) {
// Optimize with aggregation is a no-op, doing no transformation. It must pass through
// the execution plan to apply optimization logic correctly during aggregation.
vectors
} else {
if (lp.function == MiscellaneousFunctionId.HistToPromVectors)
vectors.plans.foreach(_.addRangeVectorTransformer(HistToPromSeriesMapper(schemas.part)))
else
vectors.plans.foreach(_.addRangeVectorTransformer(MiscellaneousFunctionMapper(lp.function, lp.stringArgs)))
vectors
}
}

def materializeApplyInstantFunctionRaw(qContext: QueryContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,8 @@ object ProtoConverters {
case filodb.query.MiscellaneousFunctionId.LabelJoin => GrpcMultiPartitionQueryService.MiscellaneousFunctionId.LABEL_JOIN
case filodb.query.MiscellaneousFunctionId.HistToPromVectors =>
GrpcMultiPartitionQueryService.MiscellaneousFunctionId.HIST_TO_PROM_VECTORS
case filodb.query.MiscellaneousFunctionId.OptimizeWithAgg =>
GrpcMultiPartitionQueryService.MiscellaneousFunctionId.OPTIMIZE_WITH_AGG
}
function
}
Expand All @@ -1087,6 +1089,8 @@ object ProtoConverters {
case GrpcMultiPartitionQueryService.MiscellaneousFunctionId.LABEL_JOIN => filodb.query.MiscellaneousFunctionId.LabelJoin
case GrpcMultiPartitionQueryService.MiscellaneousFunctionId.HIST_TO_PROM_VECTORS =>
filodb.query.MiscellaneousFunctionId.HistToPromVectors
case GrpcMultiPartitionQueryService.MiscellaneousFunctionId.OPTIMIZE_WITH_AGG =>
filodb.query.MiscellaneousFunctionId.OptimizeWithAgg
case GrpcMultiPartitionQueryService.MiscellaneousFunctionId.UNRECOGNIZED =>
throw new IllegalArgumentException(s"Unrecognized MiscellaneousFunctionId ${f}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import org.scalatest.matchers.should.Matchers
import filodb.prometheus.ast.TimeStepParams
import filodb.prometheus.parse.Parser
import filodb.query.LogicalPlan.getColumnFilterGroup
import filodb.query.MiscellaneousFunctionId.OptimizeWithAgg
import filodb.query.util.{ExcludeAggRule, HierarchicalQueryExperienceParams, IncludeAggRule}
import filodb.query.{Aggregate, BinaryJoin, IntervalSelector, RawSeries, SeriesKeysByFilters}
import filodb.query.{Aggregate, ApplyMiscellaneousFunction, BinaryJoin, IntervalSelector, RawSeries, SeriesKeysByFilters}

class LogicalPlanParserSpec extends AnyFunSpec with Matchers {

Expand Down Expand Up @@ -1037,4 +1038,31 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers {
updatedMetricNamesSet.contains("my_gauge:::no_rule").shouldEqual(true) // not updated
updatedMetricNamesSet.contains("your_gauge:::no_rule2").shouldEqual(true) // not updated
}

it("should correctly apply optimize_with_agg function to a Prometheus query") {
val timeParamsSec1 = TimeStepParams(1000, 10, 10000)
val query1 = """optimize_with_agg(sum(rate(mns_gmail_authenticate_request_ms{_ws_="acs-icloud",_ns_="mail-notifications",app="mail-notifications",env="prod",_type_="prom-histogram"}[5m])))"""
val lp1 = Parser.queryRangeToLogicalPlan(query1, timeParamsSec1)
lp1.isInstanceOf[ApplyMiscellaneousFunction].shouldEqual(true)
lp1.asInstanceOf[ApplyMiscellaneousFunction].function .shouldEqual(OptimizeWithAgg)
LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp1) shouldEqual "ApplyMiscellaneousFunction(Aggregate(PeriodicSeriesWithWindowing(RawSeries)))"


val timeParamsSec2 = TimeStepParams(1000, 10, 10000)
val query2 = """optimize_with_agg(sum(my_gauge:::suffix1{includeTag1="spark", includeTag2="filodb"}) by (includeTag3, includeTag1) + sum(your_gauge:::suffix2{notExcludeTag1="spark", notExcludeTag2="filodb"}) without (excludeTag1, excludeTag2))"""
val lp2 = Parser.queryRangeToLogicalPlan(query2, timeParamsSec2)
lp2.isInstanceOf[ApplyMiscellaneousFunction].shouldEqual(true)
lp2.asInstanceOf[ApplyMiscellaneousFunction].function .shouldEqual(OptimizeWithAgg)
var query = LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp2)
LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp2) shouldEqual "ApplyMiscellaneousFunction(BinaryJoin(Aggregate(PeriodicSeries(RawSeries)),Aggregate(PeriodicSeries(RawSeries))))"

query = ""
val timeParamsSec3 = TimeStepParams(1000, 10, 10000)
val query3 = """optimize_with_agg(sum(my_gauge:::no_rule{includeTag1="spark", includeTag2="filodb"}) by (includeTag3, includeTag1) + sum(your_gauge:::suffix2{notExcludeTag1="spark", notExcludeTag2="filodb"}) by (notExcludeTag1))"""
val lp3 = Parser.queryRangeToLogicalPlan(query3, timeParamsSec3)
lp3.isInstanceOf[ApplyMiscellaneousFunction].shouldEqual(true)
lp3.asInstanceOf[ApplyMiscellaneousFunction].function .shouldEqual(OptimizeWithAgg)
var query4 = LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp3)
LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp3) shouldEqual "ApplyMiscellaneousFunction(BinaryJoin(Aggregate(PeriodicSeries(RawSeries)),Aggregate(PeriodicSeries(RawSeries))))"
}
}
1 change: 1 addition & 0 deletions grpc/src/main/protobuf/query_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ enum MiscellaneousFunctionId {
LABEL_REPLACE = 0;
LABEL_JOIN = 1;
HIST_TO_PROM_VECTORS = 2;
OPTIMIZE_WITH_AGG = 3;
}

enum BinaryOperator {
Expand Down
1 change: 1 addition & 0 deletions query/src/main/scala/filodb/query/PlanEnums.scala
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ object MiscellaneousFunctionId extends Enum[MiscellaneousFunctionId] {
case object LabelReplace extends MiscellaneousFunctionId("label_replace")
case object LabelJoin extends MiscellaneousFunctionId("label_join")
case object HistToPromVectors extends MiscellaneousFunctionId("hist_to_prom_vectors")
case object OptimizeWithAgg extends MiscellaneousFunctionId("optimize_with_agg")
}

sealed abstract class SortFunctionId(override val entryName: String) extends EnumEntry
Expand Down

0 comments on commit 990dbf0

Please sign in to comment.