Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression #25416

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ queryOrganization
(SORT BY sort+=sortItem (',' sort+=sortItem)*)?
windowClause?
(LIMIT (ALL | limit=expression))?
(OFFSET offset=expression)?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does SQL standard define the order of LIMIT and OFFSET clauses? Here we force users to write LIMIT clause first.

Copy link
Contributor Author

@beliefer beliefer Oct 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, SQL standard defined the order for LIMIT and OFFSET clauses. LIMIT at the front of OFFSET.
The behavior looks contrary to MySQL direction.
MySQL support LIMIT 10,10.

;

multiInsertQueryBody
Expand Down Expand Up @@ -1008,6 +1009,7 @@ ansiNonReserved
| NO
| NULLS
| OF
| OFFSET
| OPTION
| OPTIONS
| OUT
Expand Down Expand Up @@ -1259,6 +1261,7 @@ nonReserved
| NULL
| NULLS
| OF
| OFFSET
| ONLY
| OPTION
| OPTIONS
Expand Down Expand Up @@ -1520,6 +1523,7 @@ NOT: 'NOT' | '!';
NULL: 'NULL';
NULLS: 'NULLS';
OF: 'OF';
OFFSET: 'OFFSET';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also update doc/sql-keywords.md

ON: 'ON';
ONLY: 'ONLY';
OPTION: 'OPTION';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,19 @@ trait CheckAnalysis extends PredicateHelper {
case _ => None
}

private def checkLimitClause(limitExpr: Expression): Unit = {
limitExpr match {
private def checkLimitOrOffsetClause(expr: Expression, name: String): Unit = {
expr match {
case e if !e.foldable => failAnalysis(
"The limit expression must evaluate to a constant value, but got " +
limitExpr.sql)
s"The $name expression must evaluate to a constant value, but got ${expr.sql}")
case e if e.dataType != IntegerType => failAnalysis(
s"The limit expression must be integer type, but got " +
s"The $name expression must be integer type, but got " +
e.dataType.catalogString)
case e =>
e.eval() match {
case null => failAnalysis(
s"The evaluated limit expression must not be null, but got ${limitExpr.sql}")
s"The evaluated $name expression must not be null, but got ${expr.sql}")
case v: Int if v < 0 => failAnalysis(
s"The limit expression must be equal to or greater than 0, but got $v")
s"The $name expression must be equal to or greater than 0, but got $v")
case _ => // OK
}
}
Expand Down Expand Up @@ -264,9 +263,11 @@ trait CheckAnalysis extends PredicateHelper {
}
}

case GlobalLimit(limitExpr, _) => checkLimitClause(limitExpr)
case GlobalLimit(limitExpr, _) => checkLimitOrOffsetClause(limitExpr, "limit")

case LocalLimit(limitExpr, _) => checkLimitOrOffsetClause(limitExpr, "limit")

case LocalLimit(limitExpr, _) => checkLimitClause(limitExpr)
case Offset(offsetExpr, _) => checkLimitOrOffsetClause(offsetExpr, "offset")

case _: Union | _: SetOperation if operator.children.length > 1 =>
def dataTypes(plan: LogicalPlan): Seq[DataType] = plan.output.map(_.dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ object UnsupportedOperationChecker {
throwError("Limits are not supported on streaming DataFrames/Datasets in Update " +
"output mode")

case Offset(_, _) =>
throwError("Offset is not supported on streaming DataFrames/Datasets")

case Sort(_, _, _) if !containsCompleteData(subPlan) =>
throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on " +
"aggregated DataFrame/Dataset in Complete output mode")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ package object dsl {

def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, logicalPlan)

def offset(offsetExpr: Expression): LogicalPlan = Offset(offsetExpr, logicalPlan)

def join(
otherPlan: LogicalPlan,
joinType: JoinType = Inner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit
case _: Filter => empty(p)
case _: Sample => empty(p)
case _: Sort => empty(p)
case _: Offset => empty(p)
case _: GlobalLimit => empty(p)
case _: LocalLimit => empty(p)
case _: Repartition => empty(p)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
case _: Aggregate => true
case _: Window => true
case _: Sample => true
case _: Offset => true
case _: GlobalLimit => true
case _: LocalLimit => true
case _: Generate => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
// WINDOWS
val withWindow = withOrder.optionalMap(windowClause)(withWindowClause)

// OFFSET
// - OFFSET 0 is the same as omitting the OFFSET clause
val withOffset = withWindow.optional(offset) {
Offset(typedVisit(offset), withWindow)
}

// LIMIT
// - LIMIT ALL is the same as omitting the LIMIT clause
withWindow.optional(limit) {
Limit(typedVisit(limit), withWindow)
withOffset.optional(limit) {
Limit(typedVisit(limit), withOffset)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ trait LogicalPlanVisitor[T] {
case p: Expand => visitExpand(p)
case p: Filter => visitFilter(p)
case p: Generate => visitGenerate(p)
case p: Offset => visitOffset(p)
case p: GlobalLimit => visitGlobalLimit(p)
case p: Intersect => visitIntersect(p)
case p: Join => visitJoin(p)
Expand Down Expand Up @@ -58,6 +59,8 @@ trait LogicalPlanVisitor[T] {

def visitGenerate(p: Generate): T

def visitOffset(p: Offset): T

def visitGlobalLimit(p: GlobalLimit): T

def visitIntersect(p: Intersect): T
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,20 @@ case class Pivot(
}
}

/**
* A global (coordinated) offset.
*/
case class Offset(offsetExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = child.output
override def maxRows: Option[Long] = {
import scala.math.max
offsetExpr match {
case IntegerLiteral(offset) => child.maxRows.map { x => max(x - offset, 0) }
case _ => None
}
}
}

/**
* A constructor for creating a logical limit, which is split into two separate logical nodes:
* a [[LocalLimit]], which is a partition local limit, followed by a [[GlobalLimit]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ object BasicStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {

override def visitGenerate(p: Generate): Statistics = fallback(p)

override def visitOffset(p: Offset): Statistics = fallback(p)

override def visitGlobalLimit(p: GlobalLimit): Statistics = fallback(p)

override def visitIntersect(p: Intersect): Statistics = fallback(p)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ object SizeInBytesOnlyStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {

override def visitGenerate(p: Generate): Statistics = default(p)

override def visitOffset(p: Offset): Statistics = {
val offset = p.offsetExpr.eval().asInstanceOf[Int]
val childStats = p.child.stats
val rowCount: BigInt = childStats.rowCount.map(_.-(offset).max(0)).getOrElse(0)
Statistics(
sizeInBytes = EstimationUtils.getOutputSize(p.output, rowCount, childStats.attributeStats),
rowCount = Some(rowCount))
}

override def visitGlobalLimit(p: GlobalLimit): Statistics = {
val limit = p.limitExpr.eval().asInstanceOf[Int]
val childStats = p.child.stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
expectedStatsCboOff = windowsStats)
}

test("offset estimation: offset < child's rowCount") {
val offset = Offset(Literal(2), plan)
checkStats(offset, Statistics(sizeInBytes = 96, rowCount = Some(8)))
}

test("offset estimation: offset > child's rowCount") {
val offset = Offset(Literal(20), plan)
checkStats(offset, Statistics(sizeInBytes = 1, rowCount = Some(0)))
}

test("offset estimation: offset = 0") {
val offset = Offset(Literal(0), plan)
// Offset is equal to zero, so Offset's stats is equal to its child's stats.
checkStats(offset, plan.stats.copy(attributeStats = AttributeMap(Nil)))
}

test("limit estimation: limit < child's rowCount") {
val localLimit = LocalLimit(Literal(2), plan)
val globalLimit = GlobalLimit(Literal(2), plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class SparkPlanner(
DataSourceV2Strategy ::
FileSourceStrategy ::
DataSourceStrategy(conf) ::
SpecialOffset ::
SpecialLimits ::
Aggregation ::
Window ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,22 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}

/**
* Plans special cases of offset operators.
*/
object SpecialOffset extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ReturnAnswer(rootPlan) => rootPlan match {
case logical.Offset(IntegerLiteral(offset), child) =>
CollectOffsetExec(offset, planLater(child)) :: Nil
case _ => Nil
}
case logical.Offset(IntegerLiteral(offset), child) =>
OffsetExec(offset, planLater(child)) :: Nil
case _ => Nil
}
}

/**
* Plans special cases of limit operators.
*/
Expand Down Expand Up @@ -722,6 +738,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data, _) =>
LocalTableScanExec(output, data) :: Nil
case logical.Offset(IntegerLiteral(offset), child) =>
execution.OffsetExec(offset, planLater(child)) :: Nil
case logical.LocalLimit(IntegerLiteral(limit), child) =>
execution.LocalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimit(IntegerLiteral(limit), child) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, SinglePartition}


/**
* Skip the first `offset` elements and collect them to a single partition.
* This operator will be used when a logical `Offset` operation is the final operator in an
* logical plan, which happens when the user is collecting results back to the driver.
*/
case class CollectOffsetExec(offset: Int, child: SparkPlan) extends UnaryExecNode {

override def output: Seq[Attribute] = child.output

override def outputPartitioning: Partitioning = SinglePartition

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def executeCollect(): Array[InternalRow] = child.executeCollect.drop(offset)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid collect everything into driver side?

Copy link
Contributor Author

@beliefer beliefer Sep 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a good suggestion? Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I have resolved this issue collect results to driver.


private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)

protected override def doExecute(): RDD[InternalRow] = {
sparkContext.parallelize(executeCollect(), 1)
}

}

/**
* Skip the first `offset` elements and collect them to a single partition.
*/
case class OffsetExec(offset: Int, child: SparkPlan) extends UnaryExecNode {

override def output: Seq[Attribute] = child.output

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

protected override def doExecute(): RDD[InternalRow] = {
val rdd = child.execute()
val arr = rdd.take(offset)
rdd.filter(!arr.contains(_))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need some design about how to implement OFFSET on a distributed data set. The current approach works, but it's not robust as it needs to broadcast arr (which can be large if OFFSET is large) and relies on object equality.

We may need to follow how LIMIT is implemented in Spark.

Copy link
Contributor Author

@beliefer beliefer Oct 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so.
I have referenced the implement of LIMIT, but OFFSET looks can't follow the same way as LIMIT.
Second, OFFSET easier to generate large amounts of data than LIMIT.
I have an immature suggestion give a limitation on OFFSET.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A rough idea:

  1. get the numRecords of the first partition
  2. If the numRecords is bigger than OFFSET, go to step 4
  3. get numRecords of more partitions (quadruple and retry like LIMIT), until total numRecords is bigger than OFFSET.
  4. Now we have the numRecords of some head partitions that totoal numRecords exceeds the OFFSET, we can easily skip the head records.

If we have accurate per-partition numRecords statistics, we can go step 4 directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan It is a better idea for me. I will reactor the current implement.

Copy link
Contributor Author

@beliefer beliefer Oct 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan There are exists a problem the index of partition and the order of data are inconsistent.
I have a new implement but not works file as I can't assurance the order of output produced by child plans.

  protected override def doExecute(): RDD[InternalRow] = {
    val rdd = child.execute()
    val partIdxToCountItr = rdd.mapPartitionsWithIndex{(partIdx, iter) => {
      val partIdxToRowCount = scala.collection.mutable.Map[Int,Int]()
      var rowCount = 0
      while(iter.hasNext){
        rowCount += 1
        iter.next()
      }
      partIdxToRowCount.put(partIdx, rowCount)
      partIdxToRowCount.iterator
    }}.collect().iterator
    var remainder = offset
    val partIdxToSkipCount = scala.collection.mutable.Map[Int,Int]()
    while (partIdxToCountItr.hasNext && remainder > 0) {
      val kv = partIdxToCountItr.next()
    	val partIdx = kv._1
      val count = kv._2
      if (count > remainder) {
        partIdxToSkipCount(partIdx) = remainder
        remainder = 0
      } else {
        partIdxToSkipCount(partIdx) = count
        remainder -= count
      }
    }
    val broadcastPartIdxToSkipCount = sparkContext.broadcast(partIdxToSkipCount)
    rdd.mapPartitionsWithIndex{(partIdx, iter) => {
      val skipCount = broadcastPartIdxToSkipCount.value.getOrElse(partIdx, 0)
      iter.drop(skipCount)
    }}
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I want know how to assurance the order?

}

}