Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): split query should be able to run when step > (end - start) #1924

Merged
merged 18 commits into from
Jan 21, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -725,12 +725,19 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
timeRange.endMs)
case None => (lastTimeRange.endMs, timeRange.endMs)
}
val newParams = qParams.copy(startSecs = gapStartTimeMs / 1000, endSecs = gapEndTimeMs / 1000)
val newContext = qContext.copy(origQueryParams = newParams)
val newLp = rewritePlanWithRemoteRawExport(logicalPlan,
IntervalSelector(gapStartTimeMs, gapEndTimeMs),
additionalLookbackMs = 0L.max(gapStartTimeMs - lastTimeRange.startMs))
execPlans ++ walkLogicalPlanTree(newLp, newContext, forceInProcess = true).plans
if (gapStartTimeMs <= gapEndTimeMs){
Boyuan-Yu marked this conversation as resolved.
Show resolved Hide resolved
// The opposite happens when we snap a large step to the query start and the result/gapStartTimeMs is
// larger than the query end time/gapEndTimeMs. That means there is no gap so we skip this block of code
// for handling gap range
val newParams = qParams.copy(startSecs = gapStartTimeMs / 1000, endSecs = gapEndTimeMs / 1000)
val newContext = qContext.copy(origQueryParams = newParams)
val newLp = rewritePlanWithRemoteRawExport(logicalPlan,
IntervalSelector(gapStartTimeMs, gapEndTimeMs),
additionalLookbackMs = 0L.max(gapStartTimeMs - lastTimeRange.startMs))
execPlans ++ walkLogicalPlanTree(newLp, newContext, forceInProcess = true).plans
} else {
execPlans
}
} else {
execPlans
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2713,6 +2713,119 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS
validatePlan(root, test.expected)
}
}
it("(Step > End - Start) with split should handle gap range"){
val startSec = 0
val stepSec = 6000
val endSec = 9999
val splitSec1 = 5000
val query = """sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m]))"""
val expected = """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))
|-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{_ws_ = "demo", _ns_ = "localNs"}[30m])),0,6000,5000,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))
|-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(6000,6000,9999))
|--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))
|---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List())
|----T~PeriodicSamplesMapper(start=6000000, step=6000000, end=9999000, window=Some(1800000), functionId=Some(Rate), rawSource=false, offsetMs=None)
|-----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))
|------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[11799s],9999,1,9999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))
|------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[11799s],9999,1,9999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))""".stripMargin
val partitionLocationProvider = new PartitionLocationProvider {
override def getPartitions(routingKey: Map[String, String],
timeRange: TimeRange): List[PartitionAssignment] = {
val splitMs1 = 1000 * splitSec1
List(PartitionAssignment("remote0", "remote0-url", TimeRange(timeRange.startMs, splitMs1)),
PartitionAssignment("remote1", "remote1-url", TimeRange(splitMs1 + 1, timeRange.endMs)))
}
override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter],
timeRange: TimeRange): List[PartitionAssignment] =
throw new RuntimeException("should not use")
}
val engine = new MultiPartitionPlanner(
partitionLocationProvider, singlePartitionPlanner, "local",
MetricsTestData.timeseriesDataset, queryConfig.copy(routingConfig = queryConfig.routingConfig.copy(
supportRemoteRawExport = true,
maxRemoteRawExportTimeRange = Duration(3, TimeUnit.DAYS),
periodOfUncertaintyMs = 3000)))

val lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(startSec, stepSec, endSec))
val promQlQueryParams = PromQlQueryParams(query, startSec, stepSec, endSec)
val execPlan = engine.materialize(lp,
QueryContext(origQueryParams = promQlQueryParams,
plannerParams = PlannerParams(processMultiPartition = true))
)
validatePlan(execPlan, expected)
}

it("(Step > End - Start) with split should handle gap range if start is in second partition"){
val startSec = 6000
val stepSec = 7000
val endSec = 9999
val splitSec1 = 5000
val query = """sum_over_time(test{_ws_ = "demo", _ns_ = "localNs"}[5500s])"""
val expected = """T~PeriodicSamplesMapper(start=6000000, step=7000000, end=9999000, window=Some(5500000), functionId=Some(SumOverTime), rawSource=false, offsetMs=None)
|-E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))
|--E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[9499s],9999,1,9999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))
|--E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[9499s],9999,1,9999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))""".stripMargin
val partitionLocationProvider = new PartitionLocationProvider {
override def getPartitions(routingKey: Map[String, String],
timeRange: TimeRange): List[PartitionAssignment] = {
val splitMs1 = 1000 * splitSec1
List(PartitionAssignment("remote0", "remote0-url", TimeRange(timeRange.startMs, splitMs1)),
PartitionAssignment("remote1", "remote1-url", TimeRange(splitMs1 + 1, timeRange.endMs)))
}
override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter],
timeRange: TimeRange): List[PartitionAssignment] =
throw new RuntimeException("should not use")
}
val engine = new MultiPartitionPlanner(
partitionLocationProvider, singlePartitionPlanner, "local",
MetricsTestData.timeseriesDataset, queryConfig.copy(routingConfig = queryConfig.routingConfig.copy(
supportRemoteRawExport = true,
maxRemoteRawExportTimeRange = Duration(3, TimeUnit.DAYS),
periodOfUncertaintyMs = 3000)))

val lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(startSec, stepSec, endSec))
val promQlQueryParams = PromQlQueryParams(query, startSec, stepSec, endSec)
val execPlan = engine.materialize(lp,
QueryContext(origQueryParams = promQlQueryParams,
plannerParams = PlannerParams(processMultiPartition = true))
)
validatePlan(execPlan, expected)
}

it("(Step > End - Start) with split should have one exec Plan for first partition if start is in first partition"){
val startSec = 0
val stepSec = 10000
val endSec = 9999
val splitSec1 = 5000
val query = """test{_ws_ = "demo", _ns_ = "localNs"}"""
val expected = """E~PromQlRemoteExec(PromQlQueryParams(test{_ws_ = "demo", _ns_ = "localNs"},0,10000,5000,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))""".stripMargin

val partitionLocationProvider = new PartitionLocationProvider {
override def getPartitions(routingKey: Map[String, String],
timeRange: TimeRange): List[PartitionAssignment] = {
val splitMs1 = 1000 * splitSec1
List(PartitionAssignment("remote0", "remote0-url", TimeRange(timeRange.startMs, splitMs1)),
PartitionAssignment("remote1", "remote1-url", TimeRange(splitMs1 + 1, timeRange.endMs)))
}
override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter],
timeRange: TimeRange): List[PartitionAssignment] =
throw new RuntimeException("should not use")
}
val engine = new MultiPartitionPlanner(
partitionLocationProvider, singlePartitionPlanner, "local",
MetricsTestData.timeseriesDataset, queryConfig.copy(routingConfig = queryConfig.routingConfig.copy(
supportRemoteRawExport = true,
maxRemoteRawExportTimeRange = Duration(3, TimeUnit.DAYS),
periodOfUncertaintyMs = 3000)))

val lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(startSec, stepSec, endSec))
val promQlQueryParams = PromQlQueryParams(query, startSec, stepSec, endSec)
val execPlan = engine.materialize(lp,
QueryContext(origQueryParams = promQlQueryParams,
plannerParams = PlannerParams(processMultiPartition = true))
)
validatePlan(execPlan, expected)
}

it ("should materialize split-partition queries with lookback and offset binary joins correctly"){
val startSec = 0
Expand Down