Skip to content

Commit

Permalink
Setting rawSource to true and fixing the unit tests (#1920)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeep6189 authored Jan 8, 2025
1 parent 7c49227 commit 0999aec
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,8 @@ trait DefaultPlanner {
} else lp

val series = walkLogicalPlanTree(logicalPlanWithoutBucket.series, qContext, forceInProcess)
val rawSource = logicalPlanWithoutBucket.series.isRaw && (logicalPlanWithoutBucket.series match {
case r: RawSeries => !r.supportsRemoteDataCall
case _ => true
}) // the series is raw and supports raw export, its going to yield an iterator
// the series is raw and supports raw export, it's going to yield an iterator
val rawSource = logicalPlanWithoutBucket.series.isRaw

/* Last function is used to get the latest value in the window for absent_over_time
If no data is present AbsentFunctionMapper will return range vector with value 1 */
Expand Down Expand Up @@ -203,10 +201,7 @@ trait DefaultPlanner {
} else (None, None, lp)

val rawSeries = walkLogicalPlanTree(lpWithoutBucket.rawSeries, qContext, forceInProcess)
val rawSource = lpWithoutBucket.rawSeries.isRaw && (lpWithoutBucket.rawSeries match {
case r: RawSeries => !r.supportsRemoteDataCall
case _ => true
})
val rawSource = lpWithoutBucket.rawSeries.isRaw
rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(lp.startMs, lp.stepMs, lp.endMs,
window = None, functionId = None,
stepMultipleNotationUsed = false, funcParams = Nil,
Expand Down Expand Up @@ -832,7 +827,7 @@ object PlannerUtil extends StrictLogging {
rewritePlanWithRemoteRawExport(_, rangeSelector, additionalLookbackMs).asInstanceOf[FunctionArgsPlan]),
series = rewritePlanWithRemoteRawExport(lp.series, rangeSelector, additionalLookbackMs)
.asInstanceOf[RawSeriesLikePlan])
// wont bother rewriting and adjusting the start and end for metadata calls
// won't bother rewriting and adjusting the start and end for metadata calls
case lp: MetadataQueryPlan => lp
case lp: TsCardinalities => lp
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,23 +118,23 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
// MultiPartitionPlanner has capability to stitch across time partitions, however, the logic is mostly broken
// and not well tested. The logic below would not work well for any kind of subquery since their actual
// start and ends are different from the start/end parameter of the query context. If we are to implement
// stitching across time, we need to to pass proper parameters to getPartitions() call
// stitching across time, we need to pass proper parameters to getPartitions() call
if (forceInProcess) {
// If inprocess is required, we will rely on the DefaultPlanner's implementation as the expectation is that the
// If InProcess is required, we will rely on the DefaultPlanner's implementation as the expectation is that the
// raw series is doing a remote call to get all the data.
logicalPlan match {
case lp: RawSeries if lp.supportsRemoteDataCall=>
case lp: RawSeries if lp.supportsRemoteDataCall =>
val params = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val rs = lp.rangeSelector.asInstanceOf[IntervalSelector]

val (rawExportStart, rawExportEnd) =
(rs.from - lp.offsetMs.getOrElse(0L) - lp.lookbackMs.getOrElse(0L), rs.to - lp.offsetMs.getOrElse(0L))

val partition = getPartitions(lp, params)
assert(partition.nonEmpty, s"Unexpected to see partitions empty for logicalPlan=$lp and param=$params")
val partitions = getPartitions(lp, params)
assert(partitions.nonEmpty, s"Unexpected to see partitions empty for logicalPlan=$lp and param=$params")
// Raw export from both involved partitions for the entire time duration as the shard-key migration
// is not guaranteed to happen exactly at the time of split
val execPlans = partition.map(pa => {
val execPlans = partitions.map(pa => {
val (thisPartitionStartMs, thisPartitionEndMs) = (rawExportStart, rawExportEnd)
val timeRangeOverride = TimeRange(thisPartitionEndMs, thisPartitionEndMs)
val totalOffsetThisPartitionMs = thisPartitionEndMs - thisPartitionStartMs
Expand All @@ -155,7 +155,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
, inProcessPlanDispatcher, None,
execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec]),
enableApproximatelyEqualCheck = queryConfig.routingConfig.enableApproximatelyEqualCheckInStitch)
}
}
)
)
case _ : LogicalPlan => super.defaultWalkLogicalPlanTree(logicalPlan, qContext, forceInProcess)
Expand Down Expand Up @@ -191,8 +191,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
generateRemoteExecParams(qContext, startMs, endMs)
}
// Single partition but remote, send the entire plan remotely
if (grpcEndpoint.isDefined && !(queryConfig.grpcPartitionsDenyList.contains("*") ||
queryConfig.grpcPartitionsDenyList.contains(partitionName.toLowerCase))) {
if (isPartitionEnabledForGrpc(partitionName, grpcEndpoint)) {
val endpoint = grpcEndpoint.get
val channel = channels.getOrElseUpdate(endpoint, GrpcCommonUtils.buildChannelFromEndpoint(endpoint))
PromQLGrpcRemoteExec(channel, remoteHttpTimeoutMs, remoteContext, inProcessPlanDispatcher,
Expand Down Expand Up @@ -388,6 +387,17 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
PlanResult(execPlan:: Nil)
}

/**
* @param partitionName partition name
* @param grpcEndPoint grpc endpoint
* @return true if partition is enabled with an grpc endpoint
*/
private def isPartitionEnabledForGrpc(partitionName: String, grpcEndPoint: Option[String]): Boolean = {
grpcEndPoint.isDefined &&
!(queryConfig.grpcPartitionsDenyList.contains("*") ||
queryConfig.grpcPartitionsDenyList.contains(partitionName.toLowerCase))
}

/**
* If the argument partition is local, materialize the LogicalPlan with the local planner.
* Otherwise, create a PromQlRemoteExec.
Expand Down Expand Up @@ -415,9 +425,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
localPartitionPlanner.materialize(lpWithUpdatedTime, qContextWithOverride)
} else {
val ctx = generateRemoteExecParams(qContextWithOverride, timeRange.startMs, timeRange.endMs)
if (grpcEndpoint.isDefined &&
!(queryConfig.grpcPartitionsDenyList.contains("*") ||
queryConfig.grpcPartitionsDenyList.contains(partitionName.toLowerCase))) {
if (isPartitionEnabledForGrpc(partitionName, grpcEndpoint)) {
val channel = channels.getOrElseUpdate(grpcEndpoint.get,
GrpcCommonUtils.buildChannelFromEndpoint(grpcEndpoint.get))
PromQLGrpcRemoteExec(channel, remoteHttpTimeoutMs, ctx, inProcessPlanDispatcher,
Expand Down Expand Up @@ -586,10 +594,10 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
val assignmentRanges = getAssignmentQueryRanges(partitions, timeRange,
lookbackMs = lookbackMs, offsetMs = offsetMs, stepMsOpt = stepMsOpt)
val execPlans = if (assignmentRanges.isEmpty) {
// Assignment ranges empty means we cant run this query fully on one partition and needs
// remote raw export Check if the total time of raw export is within the limits, if not return Empty result
// Assignment ranges empty means we can't run this query fully on one partition and needs
// remote raw export. Check if the total time of raw export is within the limits, if not return Empty result.
// While it may seem we don't tune the lookback of the leaf raw queries to exactly what we need from each
// partition, in reality it doesnt matter as despite a longer lookback, the actual data exported will be at most
// partition, in reality it doesn't matter as despite a longer lookback, the actual data exported will be at most
// what that partition contains.
val (startTime, endTime) = (qParams.startSecs, qParams.endSecs)
val totalExpectedRawExport = (endTime - startTime) + lookbackMs + offsetMs.max
Expand All @@ -609,7 +617,8 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
}
Seq(EmptyResultExec(qContext, dataset.ref, inProcessPlanDispatcher))
}
} else {
}
else {
// materialize a plan for each range/assignment pair
val (_, execPlans) = assignmentRanges.foldLeft(
(None: Option[(PartitionAssignment, TimeRange)], ListBuffer.empty[ExecPlan])) {
Expand Down Expand Up @@ -652,14 +661,13 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
// leaf logical plan) with supportsRemoteDataCall = true figure out if this range can entirely be selected
// from partition p1 or p2
//

// Do not perform raw exports if the export is beyond a certain value for example
// foo{}[10d] or foo[2d] offset 8d both will export 10 days of raw data which might cause heap pressure
// and OOMs. The max cross partition raw export config can control such queries from bring the process
// down but simpler queries with few minutes or even hour or two of lookback/offset will continue to work
// seamlessly with no data gaps
// Note that at the moment, while planning, we only can look at whats the max time range we can support.
// We still dont have capabilities to check the expected number of timeseries scanned or bytes scanned
// Note that at the moment, while planning, we only can look at what's the max time range we can support.
// We still don't have capabilities to check the expected number of timeseries scanned or bytes scanned
// and adding capabilities to give up a "part" of query execution if the runtime number of bytes of ts
// scanned goes high isn't available. To start with the time range scanned as a static configuration will
// be good enough and can be enhanced in future as required.
Expand Down
Loading

0 comments on commit 0999aec

Please sign in to comment.