Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression #27429

Closed
wants to merge 13 commits into from
1 change: 1 addition & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ Below is a list of all the keywords in Spark SQL.
<tr><td>NULL</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr>
<tr><td>NULLS</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>OF</td><td>non-reserved</td><td>non-reserved</td><td>reserved</td></tr>
<tr><td>OFFSET</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr>
<tr><td>ON</td><td>reserved</td><td>strict-non-reserved</td><td>reserved</td></tr>
<tr><td>ONLY</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr>
<tr><td>OPTION</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ queryOrganization
(SORT BY sort+=sortItem (',' sort+=sortItem)*)?
windowClause?
(LIMIT (ALL | limit=expression))?
(OFFSET offset=expression)?
;

multiInsertQueryBody
Expand Down Expand Up @@ -1342,6 +1343,7 @@ nonReserved
| NULL
| NULLS
| OF
| OFFSET
| ONLY
| OPTION
| OPTIONS
Expand Down Expand Up @@ -1596,6 +1598,7 @@ NOT: 'NOT' | '!';
NULL: 'NULL';
NULLS: 'NULLS';
OF: 'OF';
OFFSET: 'OFFSET';
ON: 'ON';
ONLY: 'ONLY';
OPTION: 'OPTION';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,30 @@ trait CheckAnalysis extends PredicateHelper {

case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr)

case LocalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr)
case LocalLimit(limitExpr, child) =>
checkLimitLikeClause("limit", limitExpr)
child match {
case Offset(offsetExpr, _) =>
val limit = limitExpr.eval().asInstanceOf[Int]
val offset = offsetExpr.eval().asInstanceOf[Int]
if (Int.MaxValue - limit < offset) {
failAnalysis(
s"""The sum of limit and offset must not be greater than Int.MaxValue,
| but found limit = $limit, offset = $offset.""".stripMargin)
}
case _ =>
}

case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr)

case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr)

case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
&& o.children.exists(_.isInstanceOf[Offset]) =>
failAnalysis(
s"""Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET
| clause found in: ${o.nodeName}.""".stripMargin)

case _: Union | _: SetOperation if operator.children.length > 1 =>
def dataTypes(plan: LogicalPlan): Seq[DataType] = plan.output.map(_.dataType)
def ordinalNumber(i: Int): String = i match {
Expand Down Expand Up @@ -643,6 +663,7 @@ trait CheckAnalysis extends PredicateHelper {
}
}
checkCollectedMetrics(plan)
checkOutermostOffset(plan)
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
case o if !o.resolved =>
Expand Down Expand Up @@ -768,6 +789,20 @@ trait CheckAnalysis extends PredicateHelper {
check(plan)
}

/**
* Validate that the root node of query or subquery is [[Offset]].
*/
private def checkOutermostOffset(plan: LogicalPlan): Unit = {
plan match {
case Offset(offsetExpr, _) =>
checkLimitLikeClause("offset", offsetExpr)
failAnalysis(
s"""Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET
| clause is found to be the outermost node.""".stripMargin)
case _ =>
}
}

/**
* Validates to make sure the outer references appearing inside the subquery
* are allowed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ object UnsupportedOperationChecker extends Logging {
throwError("Limits are not supported on streaming DataFrames/Datasets in Update " +
"output mode")

case Offset(_, _) => throwError("Offset is not supported on streaming DataFrames/Datasets")

case Sort(_, _, _) if !containsCompleteData(subPlan) =>
throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on " +
"aggregated DataFrame/Dataset in Complete output mode")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ package object dsl {

def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, logicalPlan)

def offset(offsetExpr: Expression): LogicalPlan = Offset(offsetExpr, logicalPlan)

def join(
otherPlan: LogicalPlan,
joinType: JoinType = Inner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
CollapseWindow,
CombineFilters,
CombineLimits,
RewriteOffsets,
CombineUnions,
// Constant folding and strength reduction
TransposeWindow,
Expand Down Expand Up @@ -1401,6 +1402,19 @@ object CombineLimits extends Rule[LogicalPlan] {
}
}

/**
* Rewrite [[Offset]] as [[Limit]] or combines two adjacent [[Offset]] operators into one,
* merging the expressions into one single expression.
*/
object RewriteOffsets extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case GlobalLimit(le, Offset(oe, grandChild)) =>
GlobalLimitAndOffset(le, oe, grandChild)
case LocalLimit(le, Offset(oe, grandChild)) =>
Offset(oe, LocalLimit(Add(le, oe), grandChild))
}
}

/**
* Check if there any cartesian products between joins of any type in the optimized plan tree.
* Throw an error if a cartesian product is found without an explicit cross join specified.
Expand Down Expand Up @@ -1506,7 +1520,7 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {
LocalRelation(projectList.map(_.toAttribute), data.map(projection(_).copy()), isStreaming)

case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
LocalRelation(output, data.take(limit), isStreaming)
LocalRelation(output, data.take(limit), isStreaming)

case Filter(condition, LocalRelation(output, data, isStreaming))
if !hasUnevaluableExpr(condition) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit
case _: Sort => empty(p)
case _: GlobalLimit => empty(p)
case _: LocalLimit => empty(p)
case _: Offset => empty(p)
case _: Repartition => empty(p)
case _: RepartitionByExpression => empty(p)
// An aggregate with non-empty group expression will return one output row per group when the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
case _: Sample => true
case _: GlobalLimit => true
case _: LocalLimit => true
case _: Offset => true
case _: Generate => true
case _: Distinct => true
case _: AppendColumns => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,10 +553,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
// WINDOWS
val withWindow = withOrder.optionalMap(windowClause)(withWindowClause)

// OFFSET
// - OFFSET 0 is the same as omitting the OFFSET clause
val withOffset = withWindow.optional(offset) {
Offset(typedVisit(offset), withWindow)
}

// LIMIT
// - LIMIT ALL is the same as omitting the LIMIT clause
withWindow.optional(limit) {
Limit(typedVisit(limit), withWindow)
withOffset.optional(limit) {
Limit(typedVisit(limit), withOffset)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ trait LogicalPlanVisitor[T] {
case p: Filter => visitFilter(p)
case p: Generate => visitGenerate(p)
case p: GlobalLimit => visitGlobalLimit(p)
case p: Offset => visitOffset(p)
case p: Intersect => visitIntersect(p)
case p: Join => visitJoin(p)
case p: LocalLimit => visitLocalLimit(p)
Expand Down Expand Up @@ -60,6 +61,8 @@ trait LogicalPlanVisitor[T] {

def visitGlobalLimit(p: GlobalLimit): T

def visitOffset(p: Offset): T

def visitIntersect(p: Intersect): T

def visitJoin(p: Join): T
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,21 @@ case class GroupingSets(
override lazy val resolved: Boolean = false
}

/**
* A logical offset, which may removing a specified number of rows from the beginning of the
* output of child logical plan.
*/
case class Offset(offsetExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output
override def maxRows: Option[Long] = {
import scala.math.max
offsetExpr match {
case IntegerLiteral(offset) => child.maxRows.map { x => max(x - offset, 0) }
case _ => None
}
}
}

/**
* A constructor for creating a pivot, which will later be converted to a [[Project]]
* or an [[Aggregate]] during the query analysis.
Expand Down Expand Up @@ -827,6 +842,23 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr
}
}

/**
* A global (coordinated) limit with offset. This operator can skip at most `offsetExpr` number and
* emit at most `limitExpr` number in total.
*/
case class GlobalLimitAndOffset(
limitExpr: Expression,
offsetExpr: Expression,
child: LogicalPlan) extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output
override def maxRows: Option[Long] = {
limitExpr match {
case IntegerLiteral(limit) => Some(limit)
case _ => None
}
}
}

/**
* This is similar with [[Limit]] except:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ object BasicStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {

override def visitGlobalLimit(p: GlobalLimit): Statistics = fallback(p)

override def visitOffset(p: Offset): Statistics = fallback(p)

override def visitIntersect(p: Intersect): Statistics = fallback(p)

override def visitJoin(p: Join): Statistics = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ object SizeInBytesOnlyStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {
rowCount = Some(rowCount))
}

override def visitOffset(p: Offset): Statistics = {
val offset = p.offsetExpr.eval().asInstanceOf[Int]
val childStats = p.child.stats
val rowCount: BigInt = childStats.rowCount.map(_.-(offset).max(0)).getOrElse(0)
Statistics(
sizeInBytes = EstimationUtils.getOutputSize(p.output, rowCount, childStats.attributeStats),
rowCount = Some(rowCount))
}

override def visitIntersect(p: Intersect): Statistics = {
val leftSize = p.left.stats.sizeInBytes
val rightSize = p.right.stats.sizeInBytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,38 @@ class AnalysisErrorSuite extends AnalysisTest {
"The limit expression must be equal to or greater than 0, but got -1" :: Nil
)

errorTest(
"an evaluated offset class must not be null",
testRelation.offset(Literal(null, IntegerType)),
"The evaluated offset expression must not be null, but got " :: Nil
)

errorTest(
"num_rows in offset clause must be equal to or greater than 0",
listRelation.offset(-1),
"The offset expression must be equal to or greater than 0, but got -1" :: Nil
)

errorTest(
"OFFSET clause is outermost node",
testRelation.offset(Literal(10, IntegerType)),
"Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET" ::
"clause is found to be the outermost node." :: Nil
)

errorTest(
"OFFSET clause in other node",
testRelation2.offset(Literal(10, IntegerType)).where('b > 1),
"Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET" ::
"clause found in: Filter." :: Nil
)

errorTest(
"the sum of num_rows in limit clause and num_rows in offset clause less than Int.MaxValue",
testRelation.offset(Literal(2000000000, IntegerType)).limit(Literal(1000000000, IntegerType)),
"The sum of limit and offset must not be greater than Int.MaxValue" :: Nil
)

errorTest(
"more than one generators in SELECT",
listRelation.select(Explode($"list"), Explode($"list")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ class TableIdentifierParserSuite extends SparkFunSuite with SQLHelper {
"null",
"nulls",
"of",
"offset",
"on",
"only",
"option",
Expand Down Expand Up @@ -580,6 +581,7 @@ class TableIdentifierParserSuite extends SparkFunSuite with SQLHelper {
"natural",
"not",
"null",
"offset",
"on",
"only",
"or",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
expectedStatsCboOff = windowsStats)
}

test("offset estimation: offset < child's rowCount") {
val offset = Offset(Literal(2), plan)
checkStats(offset, Statistics(sizeInBytes = 96, rowCount = Some(8)))
}

test("offset estimation: offset > child's rowCount") {
val offset = Offset(Literal(20), plan)
checkStats(offset, Statistics(sizeInBytes = 1, rowCount = Some(0)))
}

test("offset estimation: offset = 0") {
val offset = Offset(Literal(0), plan)
// Offset is equal to zero, so Offset's stats is equal to its child's stats.
checkStats(offset, plan.stats.copy(attributeStats = AttributeMap(Nil)))
}

test("limit estimation: limit < child's rowCount") {
val localLimit = LocalLimit(Literal(2), plan)
val globalLimit = GlobalLimit(Literal(2), plan)
Expand Down
Loading