Skip to content

Commit

Permalink
Support ansi offset clause
Browse files Browse the repository at this point in the history
  • Loading branch information
beliefer committed Feb 1, 2020
1 parent 6a32d83 commit 7316cca
Show file tree
Hide file tree
Showing 27 changed files with 496 additions and 139 deletions.
1 change: 1 addition & 0 deletions docs/sql-keywords.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ Below is a list of all the keywords in Spark SQL.
<tr><td>NULL</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr>
<tr><td>NULLS</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>OF</td><td>non-reserved</td><td>non-reserved</td><td>reserved</td></tr>
<tr><td>OFFSET</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr>
<tr><td>ON</td><td>reserved</td><td>strict-non-reserved</td><td>reserved</td></tr>
<tr><td>ONLY</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr>
<tr><td>OPTION</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ queryOrganization
(SORT BY sort+=sortItem (',' sort+=sortItem)*)?
windowClause?
(LIMIT (ALL | limit=expression))?
(OFFSET offset=expression)?
;

multiInsertQueryBody
Expand Down Expand Up @@ -1338,6 +1339,7 @@ nonReserved
| NULL
| NULLS
| OF
| OFFSET
| ONLY
| OPTION
| OPTIONS
Expand Down Expand Up @@ -1595,6 +1597,7 @@ NOT: 'NOT' | '!';
NULL: 'NULL';
NULLS: 'NULLS';
OF: 'OF';
OFFSET: 'OFFSET';
ON: 'ON';
ONLY: 'ONLY';
OPTION: 'OPTION';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,36 @@ trait CheckAnalysis extends PredicateHelper {
}
}

case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr)
case GlobalLimit(limitExpr, _, _) => checkLimitLikeClause("limit", limitExpr)

case LocalLimit(limitExpr, _, child) =>
checkLimitLikeClause("limit", limitExpr)
child match {
case Offset(offsetExpr, _) =>
val limit = limitExpr.eval().asInstanceOf[Int]
val offset = offsetExpr.eval().asInstanceOf[Int]
if (Int.MaxValue - limit < offset) {
failAnalysis(
s"""The sum of limit and offset must not be greater than Int.MaxValue,
| but found limit = $limit, offset = $offset.""".stripMargin)
}
case _ =>
}

case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr)
case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr)

case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr)

case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
&& o.children.exists(_.isInstanceOf[Offset]) =>
if (!SQLConf.get.forceUsingOffsetWithoutLimit) {
failAnalysis(
s"""Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET
| clause found in: ${o.nodeName}. If you know exactly that OFFSET clause
| does not cause excessive overhead and still want to use it, set
| ${SQLConf.FORCE_USING_OFFSET_WITHOUT_LIMIT.key} to true.""".stripMargin)
}

case _: Union | _: SetOperation if operator.children.length > 1 =>
def dataTypes(plan: LogicalPlan): Seq[DataType] = plan.output.map(_.dataType)
def ordinalNumber(i: Int): String = i match {
Expand Down Expand Up @@ -630,6 +654,7 @@ trait CheckAnalysis extends PredicateHelper {
}
}
checkCollectedMetrics(plan)
checkOutermostOffset(plan)
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
case o if !o.resolved =>
Expand Down Expand Up @@ -755,6 +780,24 @@ trait CheckAnalysis extends PredicateHelper {
check(plan)
}

/**
* Validate that the root node of query or subquery is [[Offset]].
*/
private def checkOutermostOffset(plan: LogicalPlan): Unit = {
plan match {
case Offset(offsetExpr, _) =>
checkLimitLikeClause("offset", offsetExpr)
if (!SQLConf.get.forceUsingOffsetWithoutLimit) {
failAnalysis(
s"""Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET
| clause is found to be the outermost node. If you know exactly that OFFSET
| clause does not cause excessive overhead and still want to use it, set
| ${SQLConf.FORCE_USING_OFFSET_WITHOUT_LIMIT.key} to true.""".stripMargin)
}
case _ =>
}
}

/**
* Validates to make sure the outer references appearing inside the subquery
* are allowed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,13 @@ object UnsupportedOperationChecker extends Logging {
case GroupingSets(_, _, child, _) if child.isStreaming =>
throwError("GroupingSets is not supported on streaming DataFrames/Datasets")

case GlobalLimit(_, _) | LocalLimit(_, _)
case GlobalLimit(_, _, _) | LocalLimit(_, _, _)
if subPlan.children.forall(_.isStreaming) && outputMode == InternalOutputModes.Update =>
throwError("Limits are not supported on streaming DataFrames/Datasets in Update " +
"output mode")

case Offset(_, _) => throwError("Offset is not supported on streaming DataFrames/Datasets")

case Sort(_, _, _) if !containsCompleteData(subPlan) =>
throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on " +
"aggregated DataFrame/Dataset in Complete output mode")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,9 @@ package object dsl {

def deserialize[T : Encoder]: LogicalPlan = CatalystSerde.deserialize[T](logicalPlan)

def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, logicalPlan)
def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, child = logicalPlan)

def offset(offsetExpr: Expression): LogicalPlan = Offset(offsetExpr, logicalPlan)

def join(
otherPlan: LogicalPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
CollapseWindow,
CombineFilters,
CombineLimits,
RewriteOffsets,
CombineUnions,
// Constant folding and strength reduction
TransposeWindow,
Expand Down Expand Up @@ -451,21 +452,24 @@ object LimitPushDown extends Rule[LogicalPlan] {

private def stripGlobalLimitIfPresent(plan: LogicalPlan): LogicalPlan = {
plan match {
case GlobalLimit(_, child) => child
case GlobalLimit(_, _, child) => child
case _ => plan
}
}

private def maybePushLocalLimit(limitExp: Expression, plan: LogicalPlan): LogicalPlan = {
private def maybePushLocalLimit(
limitExp: Expression,
offsetExp: Expression,
plan: LogicalPlan): LogicalPlan = {
(limitExp, plan.maxRowsPerPartition) match {
case (IntegerLiteral(newLimit), Some(childMaxRows)) if newLimit < childMaxRows =>
// If the child has a cap on max rows per partition and the cap is larger than
// the new limit, put a new LocalLimit there.
LocalLimit(limitExp, stripGlobalLimitIfPresent(plan))
LocalLimit(limitExp, offsetExp, stripGlobalLimitIfPresent(plan))

case (_, None) =>
// If the child has no cap, put the new LocalLimit.
LocalLimit(limitExp, stripGlobalLimitIfPresent(plan))
LocalLimit(limitExp, offsetExp, stripGlobalLimitIfPresent(plan))

case _ =>
// Otherwise, don't put a new LocalLimit.
Expand All @@ -480,22 +484,22 @@ object LimitPushDown extends Rule[LogicalPlan] {
// Note: right now Union means UNION ALL, which does not de-duplicate rows, so it is safe to
// pushdown Limit through it. Once we add UNION DISTINCT, however, we will not be able to
// pushdown Limit.
case LocalLimit(exp, Union(children)) =>
LocalLimit(exp, Union(children.map(maybePushLocalLimit(exp, _))))
case LocalLimit(le, oe, Union(children)) =>
LocalLimit(le, oe, Union(children.map(maybePushLocalLimit(le, oe, _))))
// Add extra limits below OUTER JOIN. For LEFT OUTER and RIGHT OUTER JOIN we push limits to
// the left and right sides, respectively. It's not safe to push limits below FULL OUTER
// JOIN in the general case without a more invasive rewrite.
// We also need to ensure that this limit pushdown rule will not eventually introduce limits
// on both sides if it is applied multiple times. Therefore:
// - If one side is already limited, stack another limit on top if the new limit is smaller.
// The redundant limit will be collapsed by the CombineLimits rule.
case LocalLimit(exp, join @ Join(left, right, joinType, _, _)) =>
case LocalLimit(le, oe, join @ Join(left, right, joinType, _, _)) =>
val newJoin = joinType match {
case RightOuter => join.copy(right = maybePushLocalLimit(exp, right))
case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left))
case RightOuter => join.copy(right = maybePushLocalLimit(le, oe, right))
case LeftOuter => join.copy(left = maybePushLocalLimit(le, oe, left))
case _ => join
}
LocalLimit(exp, newJoin)
LocalLimit(le, oe, newJoin)
}
}

Expand Down Expand Up @@ -710,11 +714,11 @@ object CollapseProject extends Rule[LogicalPlan] {
agg.copy(aggregateExpressions = buildCleanedProjectList(
p.projectList, agg.aggregateExpressions))
}
case Project(l1, g @ GlobalLimit(_, limit @ LocalLimit(_, p2 @ Project(l2, _))))
case Project(l1, g @ GlobalLimit(_, _, limit @ LocalLimit(_, _, p2 @ Project(l2, _))))
if isRenaming(l1, l2) =>
val newProjectList = buildCleanedProjectList(l1, l2)
g.copy(child = limit.copy(child = p2.copy(projectList = newProjectList)))
case Project(l1, limit @ LocalLimit(_, p2 @ Project(l2, _))) if isRenaming(l1, l2) =>
case Project(l1, limit @ LocalLimit(_, _, p2 @ Project(l2, _))) if isRenaming(l1, l2) =>
val newProjectList = buildCleanedProjectList(l1, l2)
limit.copy(child = p2.copy(projectList = newProjectList))
case Project(l1, r @ Repartition(_, _, p @ Project(l2, _))) if isRenaming(l1, l2) =>
Expand Down Expand Up @@ -1382,12 +1386,29 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
*/
object CombineLimits extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case GlobalLimit(le, GlobalLimit(ne, grandChild)) =>
GlobalLimit(Least(Seq(ne, le)), grandChild)
case LocalLimit(le, LocalLimit(ne, grandChild)) =>
LocalLimit(Least(Seq(ne, le)), grandChild)
case Limit(le, Limit(ne, grandChild)) =>
Limit(Least(Seq(ne, le)), grandChild)
case GlobalLimit(le, oe, GlobalLimit(nle, noe, grandChild)) =>
GlobalLimit(Least(Seq(nle, le)), Greatest(Seq(noe, oe)), grandChild)
case LocalLimit(le, oe, LocalLimit(nle, noe, grandChild)) =>
LocalLimit(Least(Seq(nle, le)), Greatest(Seq(noe, oe)), grandChild)
case Limit(le, oe, Limit(nle, noe, grandChild)) =>
Limit(Least(Seq(nle, le)), Greatest(Seq(noe, oe)), grandChild)
}
}

/**
* Rewrite [[Offset]] as [[Limit]] or combines two adjacent [[Offset]] operators into one,
* merging the expressions into one single expression.
*/
object RewriteOffsets extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case GlobalLimit(le, oe, Offset(noe, grandChild)) =>
GlobalLimit(le, Greatest(Seq(noe, oe)), grandChild)
case LocalLimit(le, oe, Offset(noe, grandChild)) =>
Offset(noe, LocalLimit(le, Greatest(Seq(noe, oe)), grandChild))
case Offset(oe, Offset(noe, grandChild)) =>
Offset(Greatest(Seq(noe, oe)), grandChild)
case Offset(oe @ Literal(v, IntegerType), child) =>
Limit(Limit.INVALID_LIMIT, oe, child)
}
}

Expand Down Expand Up @@ -1495,8 +1516,11 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {
projection.initialize(0)
LocalRelation(projectList.map(_.toAttribute), data.map(projection(_).copy()), isStreaming)

case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
LocalRelation(output, data.take(limit), isStreaming)
case Limit(
IntegerLiteral(limit),
IntegerLiteral(offset),
LocalRelation(output, data, isStreaming)) =>
LocalRelation(output, data.drop(offset).take(limit), isStreaming)

case Filter(condition, LocalRelation(output, data, isStreaming))
if !hasUnevaluableExpr(condition) =>
Expand Down Expand Up @@ -1779,7 +1803,7 @@ object OptimizeLimitZero extends Rule[LogicalPlan] {
// changes up the Logical Plan.
//
// Replace Global Limit 0 nodes with empty Local Relation
case gl @ GlobalLimit(IntegerLiteral(0), _) =>
case gl @ GlobalLimit(IntegerLiteral(0), _, _) =>
empty(gl)

// Note: For all SQL queries, if a LocalLimit 0 node exists in the Logical Plan, then a
Expand All @@ -1788,7 +1812,7 @@ object OptimizeLimitZero extends Rule[LogicalPlan] {
// then the following rule will handle that case as well.
//
// Replace Local Limit 0 nodes with empty Local Relation
case ll @ LocalLimit(IntegerLiteral(0), _) =>
case ll @ LocalLimit(IntegerLiteral(0), _, _) =>
empty(ll)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit
case _: Sort => empty(p)
case _: GlobalLimit => empty(p)
case _: LocalLimit => empty(p)
case _: Offset => empty(p)
case _: Repartition => empty(p)
case _: RepartitionByExpression => empty(p)
// An aggregate with non-empty group expression will return one output row per group when the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
case _: Sample => true
case _: GlobalLimit => true
case _: LocalLimit => true
case _: Offset => true
case _: Generate => true
case _: Distinct => true
case _: AppendColumns => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object RewriteNonCorrelatedExists extends Rule[LogicalPlan] {
case exists: Exists if exists.children.isEmpty =>
IsNotNull(
ScalarSubquery(
plan = Limit(Literal(1), Project(Seq(Alias(Literal(1), "col")()), exists.plan)),
plan = Limit(Literal(1), child = Project(Seq(Alias(Literal(1), "col")()), exists.plan)),
exprId = exists.exprId))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,10 +553,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
// WINDOWS
val withWindow = withOrder.optionalMap(windowClause)(withWindowClause)

// OFFSET
// - OFFSET 0 is the same as omitting the OFFSET clause
val withOffset = withWindow.optional(offset) {
Offset(typedVisit(offset), withWindow)
}

// LIMIT
// - LIMIT ALL is the same as omitting the LIMIT clause
withWindow.optional(limit) {
Limit(typedVisit(limit), withWindow)
withOffset.optional(limit) {
Limit(typedVisit(limit), Literal(0), withOffset)
}
}

Expand Down Expand Up @@ -1001,7 +1007,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging

ctx.sampleMethod() match {
case ctx: SampleByRowsContext =>
Limit(expression(ctx.expression), query)
Limit(expression(ctx.expression), child = query)

case ctx: SampleByPercentileContext =>
val fraction = ctx.percentage.getText.toDouble
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ trait LogicalPlanVisitor[T] {
case p: Filter => visitFilter(p)
case p: Generate => visitGenerate(p)
case p: GlobalLimit => visitGlobalLimit(p)
case p: Offset => visitOffset(p)
case p: Intersect => visitIntersect(p)
case p: Join => visitJoin(p)
case p: LocalLimit => visitLocalLimit(p)
Expand Down Expand Up @@ -60,6 +61,8 @@ trait LogicalPlanVisitor[T] {

def visitGlobalLimit(p: GlobalLimit): T

def visitOffset(p: Offset): T

def visitIntersect(p: Intersect): T

def visitJoin(p: Join): T
Expand Down
Loading

0 comments on commit 7316cca

Please sign in to comment.