Skip to content

Commit

Permalink
Merge branch 'develop' into integration
Browse files Browse the repository at this point in the history
  • Loading branch information
Yu Zhang committed Dec 17, 2024
2 parents 182de80 + 6cc8a24 commit 4a205c1
Show file tree
Hide file tree
Showing 16 changed files with 384 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -567,30 +567,6 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
}}
}

/**
* Throws a BadQueryException if any of the following conditions hold:
* (1) the plan spans more than one non-metric shard key prefix.
* (2) the plan contains at least one BinaryJoin, and any of its BinaryJoins contain an offset.
* @param splitLeafPlan must contain leaf plans that individually span multiple partitions.
*/
private def validateSplitLeafPlan(splitLeafPlan: LogicalPlan): Unit = {
val baseErrorMessage = "This query contains selectors that individually read data from multiple partitions. " +
"This is likely because a selector's data was migrated between partitions. "
if (hasBinaryJoin(splitLeafPlan) && getOffsetMillis(splitLeafPlan).exists(_ > 0)) {
throw new BadQueryException( baseErrorMessage +
"These \"split\" queries cannot contain binary joins with offsets."
)
}
lazy val hasMoreThanOneNonMetricShardKey =
LogicalPlanUtils.resolvePipeConcatenatedShardKeyFilters(splitLeafPlan, dataset.options.nonMetricShardColumns)
.filter(_.nonEmpty).distinct.size > 1
if (hasMoreThanOneNonMetricShardKey) {
throw new BadQueryException( baseErrorMessage +
"These \"split\" queries are not supported if they contain multiple non-metric shard keys."
)
}
}

/**
* Materializes a LogicalPlan with leaves that individually span multiple partitions.
* All "split-leaf" plans will fail to materialize (throw a BadQueryException) if they
Expand All @@ -601,9 +577,6 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
//scalastyle:off method.length
private def materializeSplitLeafPlan(logicalPlan: LogicalPlan,
qContext: QueryContext): PlanResult = {
// TODO: Reassess this validate, we should also support binary joins in split leaf as long as they are within
// the limits of max range of data exported
validateSplitLeafPlan(logicalPlan)
val qParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]
// get a mapping of assignments to time-ranges to query
val lookbackMs = getLookBackMillis(logicalPlan).max
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,7 @@ object ProtoConverters {
case InternalRangeFunction.AvgWithSumAndCountOverTime =>
GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME
case InternalRangeFunction.SumAndMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME
case InternalRangeFunction.RateAndMinMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.RATE_AND_MIN_MAX_OVER_TIME
case InternalRangeFunction.LastSampleHistMaxMin => GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN
case InternalRangeFunction.Timestamp => GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP
case InternalRangeFunction.AbsentOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME
Expand Down Expand Up @@ -1017,6 +1018,7 @@ object ProtoConverters {
case GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME =>
InternalRangeFunction.AvgWithSumAndCountOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME => InternalRangeFunction.SumAndMaxOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.RATE_AND_MIN_MAX_OVER_TIME => InternalRangeFunction.RateAndMinMaxOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN => InternalRangeFunction.LastSampleHistMaxMin
case GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP => InternalRangeFunction.Timestamp
case GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME => InternalRangeFunction.AbsentOverTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import filodb.prometheus.ast.TimeStepParams
import filodb.prometheus.parse.Parser
import filodb.query.BinaryOperator.{ADD, LAND}
import filodb.query.InstantFunctionId.Ln
import filodb.query.{BadQueryException, LabelCardinality, LogicalPlan, PlanValidationSpec, SeriesKeysByFilters, TsCardinalities}
import filodb.query.{LabelCardinality, LogicalPlan, PlanValidationSpec, SeriesKeysByFilters, TsCardinalities}
import filodb.query.exec._

class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValidationSpec{
Expand Down Expand Up @@ -1712,18 +1712,6 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida
validatePlan(execPlan2, expectedPlanWithRemoteExec1)


val query4 = "sum(rate(test{job = \"app\"}[10m])) + sum(rate(bar{job = \"app\"}[5m] offset 5m))"
val lp4 = Parser.queryRangeToLogicalPlan(query4, TimeStepParams(2000, stepSecs, 10000))

val promQlQueryParams4 = PromQlQueryParams(query4, 1000, 100, 10000)
intercept[BadQueryException] {
// Expecting to see Exception when we use BinaryJoin with offsets, technically this too should not be a big deal
// as we need to identify the right window, however this was not supported even before the change and it is ok to
// leave it unaddressed in the first phase as its just Binary joins with offsets
engine.materialize(lp4, QueryContext(origQueryParams = promQlQueryParams4, plannerParams =
PlannerParams(processMultiPartition = true)))
}


// Planner with period of uncertainty should still generate steps that are aligned with start and step,
// that is should be snapped correctly
Expand Down
Loading

0 comments on commit 4a205c1

Please sign in to comment.