Skip to content

Commit

Permalink
Added test cases and ignore materialize for no-op
Browse files Browse the repository at this point in the history
  • Loading branch information
mference committed Jan 3, 2025
1 parent 7347111 commit 75c8e3b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package filodb.coordinator.queryplanner


import java.util.concurrent.ThreadLocalRandom

import akka.serialization.SerializationExtension
import com.typesafe.scalalogging.StrictLogging

import filodb.coordinator.{ActorPlanDispatcher, ActorSystemHolder, GrpcPlanDispatcher, RemoteActorPlanDispatcher}
import filodb.core.metadata.{Dataset, DatasetOptions, Schemas}
import filodb.core.query._
Expand Down Expand Up @@ -233,11 +231,21 @@ trait DefaultPlanner {
lp: ApplyMiscellaneousFunction,
forceInProcess: Boolean = false): PlanResult = {
val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess)
if (lp.function == MiscellaneousFunctionId.HistToPromVectors)
vectors.plans.foreach(_.addRangeVectorTransformer(HistToPromSeriesMapper(schemas.part)))
else
vectors.plans.foreach(_.addRangeVectorTransformer(MiscellaneousFunctionMapper(lp.function, lp.stringArgs)))
vectors
if (lp.function == MiscellaneousFunctionId.OptimizeWithAgg)
{
// The `Optimize` function is a no-operation (no-op), meaning it does not perform any transformation or
// computation. However, it is necessary to pass it through to the execution plan without any modifications when
// dealing with an "Optimize with Aggregation" query. This ensures that the optimization logic is preserved
// and applied correctly during the aggregation phase, without interfering with the underlying data
// or query execution flow.
vectors
}else{
if (lp.function == MiscellaneousFunctionId.HistToPromVectors)
vectors.plans.foreach(_.addRangeVectorTransformer(HistToPromSeriesMapper(schemas.part)))
else
vectors.plans.foreach(_.addRangeVectorTransformer(MiscellaneousFunctionMapper(lp.function, lp.stringArgs)))
vectors
}
}

def materializeApplyInstantFunctionRaw(qContext: QueryContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import org.scalatest.matchers.should.Matchers
import filodb.prometheus.ast.TimeStepParams
import filodb.prometheus.parse.Parser
import filodb.query.LogicalPlan.getColumnFilterGroup
import filodb.query.MiscellaneousFunctionId.OptimizeWithAgg
import filodb.query.util.{ExcludeAggRule, HierarchicalQueryExperienceParams, IncludeAggRule}
import filodb.query.{Aggregate, BinaryJoin, IntervalSelector, RawSeries, SeriesKeysByFilters}
import filodb.query.{Aggregate, ApplyMiscellaneousFunction, BinaryJoin, IntervalSelector, RawSeries, SeriesKeysByFilters}

class LogicalPlanParserSpec extends AnyFunSpec with Matchers {

Expand Down Expand Up @@ -1037,4 +1038,31 @@ class LogicalPlanParserSpec extends AnyFunSpec with Matchers {
updatedMetricNamesSet.contains("my_gauge:::no_rule").shouldEqual(true) // not updated
updatedMetricNamesSet.contains("your_gauge:::no_rule2").shouldEqual(true) // not updated
}

it("should correctly apply optimize_with_agg function to a Prometheus query") {
val timeParamsSec1 = TimeStepParams(1000, 10, 10000)
val query1 = """optimize_with_agg(sum(rate(mns_gmail_authenticate_request_ms{_ws_="acs-icloud",_ns_="mail-notifications",app="mail-notifications",env="prod",_type_="prom-histogram"}[5m])))"""
val lp1 = Parser.queryRangeToLogicalPlan(query1, timeParamsSec1)
lp1.isInstanceOf[ApplyMiscellaneousFunction].shouldEqual(true)
lp1.asInstanceOf[ApplyMiscellaneousFunction].function .shouldEqual(OptimizeWithAgg)
LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp1) shouldEqual "ApplyMiscellaneousFunction(Aggregate(PeriodicSeriesWithWindowing(RawSeries)))"


val timeParamsSec2 = TimeStepParams(1000, 10, 10000)
val query2 = """optimize_with_agg(sum(my_gauge:::suffix1{includeTag1="spark", includeTag2="filodb"}) by (includeTag3, includeTag1) + sum(your_gauge:::suffix2{notExcludeTag1="spark", notExcludeTag2="filodb"}) without (excludeTag1, excludeTag2))"""
val lp2 = Parser.queryRangeToLogicalPlan(query2, timeParamsSec2)
lp2.isInstanceOf[ApplyMiscellaneousFunction].shouldEqual(true)
lp2.asInstanceOf[ApplyMiscellaneousFunction].function .shouldEqual(OptimizeWithAgg)
var query = LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp2)
LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp2) shouldEqual "ApplyMiscellaneousFunction(BinaryJoin(Aggregate(PeriodicSeries(RawSeries)),Aggregate(PeriodicSeries(RawSeries))))"

query = ""
val timeParamsSec3 = TimeStepParams(1000, 10, 10000)
val query3 = """optimize_with_agg(sum(my_gauge:::no_rule{includeTag1="spark", includeTag2="filodb"}) by (includeTag3, includeTag1) + sum(your_gauge:::suffix2{notExcludeTag1="spark", notExcludeTag2="filodb"}) by (notExcludeTag1))"""
val lp3 = Parser.queryRangeToLogicalPlan(query3, timeParamsSec3)
lp3.isInstanceOf[ApplyMiscellaneousFunction].shouldEqual(true)
lp3.asInstanceOf[ApplyMiscellaneousFunction].function .shouldEqual(OptimizeWithAgg)
var query4 = LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp3)
LogicalPlanUtils.getLogicalPlanTreeStringRepresentation(lp3) shouldEqual "ApplyMiscellaneousFunction(BinaryJoin(Aggregate(PeriodicSeries(RawSeries)),Aggregate(PeriodicSeries(RawSeries))))"
}
}

0 comments on commit 75c8e3b

Please sign in to comment.