Skip to content

Commit

Permalink
Remove offset only
Browse files Browse the repository at this point in the history
  • Loading branch information
beliefer committed Feb 4, 2020
1 parent 4f22a7a commit 1149727
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,9 @@ trait CheckAnalysis extends PredicateHelper {

case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
&& o.children.exists(_.isInstanceOf[Offset]) =>
if (!SQLConf.get.forceUsingOffsetWithoutLimit) {
failAnalysis(
s"""Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET
| clause found in: ${o.nodeName}. If you know exactly that OFFSET clause
| does not cause excessive overhead and still want to use it, set
| ${SQLConf.FORCE_USING_OFFSET_WITHOUT_LIMIT.key} to true.""".stripMargin)
}
failAnalysis(
s"""Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET
| clause found in: ${o.nodeName}.""".stripMargin)

case _: Union | _: SetOperation if operator.children.length > 1 =>
def dataTypes(plan: LogicalPlan): Seq[DataType] = plan.output.map(_.dataType)
Expand Down Expand Up @@ -787,13 +783,9 @@ trait CheckAnalysis extends PredicateHelper {
plan match {
case Offset(offsetExpr, _) =>
checkLimitLikeClause("offset", offsetExpr)
if (!SQLConf.get.forceUsingOffsetWithoutLimit) {
failAnalysis(
s"""Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET
| clause is found to be the outermost node. If you know exactly that OFFSET
| clause does not cause excessive overhead and still want to use it, set
| ${SQLConf.FORCE_USING_OFFSET_WITHOUT_LIMIT.key} to true.""".stripMargin)
}
failAnalysis(
s"""Only the OFFSET clause is allowed in the LIMIT clause, but the OFFSET
| clause is found to be the outermost node.""".stripMargin)
case _ =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1405,10 +1405,6 @@ object RewriteOffsets extends Rule[LogicalPlan] {
GlobalLimit(le, Greatest(Seq(noe, oe)), grandChild)
case LocalLimit(le, oe, Offset(noe, grandChild)) =>
Offset(noe, LocalLimit(le, Greatest(Seq(noe, oe)), grandChild))
case Offset(oe, Offset(noe, grandChild)) =>
Offset(Greatest(Seq(noe, oe)), grandChild)
case Offset(oe @ Literal(v, IntegerType), child) =>
Limit(Limit.INVALID_LIMIT, oe, child)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,6 @@ case class Pivot(
* So we introduced LocalLimit and GlobalLimit in the logical plan node for limit pushdown.
*/
object Limit {
val INVALID_LIMIT = Literal(-1)
def apply(
limitExpr: Expression,
offsetExpr: Expression = Literal(0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2158,12 +2158,6 @@ object SQLConf {
.checkValue(_ > 0, "The value of spark.sql.addPartitionInBatch.size must be positive")
.createWithDefault(100)

val FORCE_USING_OFFSET_WITHOUT_LIMIT = buildConf("spark.sql.forceUsingOffsetWithoutLimit")
.doc("When this option is set to true, although OFFSET may have large overhead, " +
"still use it. Otherwise, an analysis exception is thrown.")
.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
Expand Down Expand Up @@ -2736,8 +2730,6 @@ class SQLConf extends Serializable with Logging {

def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED)

def forceUsingOffsetWithoutLimit: Boolean = getConf(SQLConf.FORCE_USING_OFFSET_WITHOUT_LIMIT)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ReturnAnswer(rootPlan) => rootPlan match {
case Limit(IntegerLiteral(limit), IntegerLiteral(offset), Sort(order, true, child))
if limit != Limit.INVALID_LIMIT.value && limit < conf.topKSortFallbackThreshold =>
if limit < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, offset, order, child.output, planLater(child)) :: Nil
case Limit(
IntegerLiteral(limit),
IntegerLiteral(offset),
Project(projectList, Sort(order, true, child)))
if limit != Limit.INVALID_LIMIT.value && limit < conf.topKSortFallbackThreshold =>
if limit < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, offset, order, projectList, planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
CollectLimitExec(limit, offset, planLater(child)) :: Nil
Expand All @@ -97,13 +97,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case other => planLater(other) :: Nil
}
case Limit(IntegerLiteral(limit), IntegerLiteral(offset), Sort(order, true, child))
if limit != Limit.INVALID_LIMIT.value && limit < conf.topKSortFallbackThreshold =>
if limit < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, offset, order, child.output, planLater(child)) :: Nil
case Limit(
IntegerLiteral(limit),
IntegerLiteral(offset),
Project(projectList, Sort(order, true, child)))
if limit != Limit.INVALID_LIMIT.value && limit < conf.topKSortFallbackThreshold =>
if limit < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(limit, offset, order, projectList, planLater(child)) :: Nil
case _ => Nil
}
Expand Down
72 changes: 19 additions & 53 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ case class CollectLimitExec(limit: Int, offset: Int, child: SparkPlan) extends L
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = SinglePartition
override def executeCollect(): Array[InternalRow] = {
if (limit == Limit.INVALID_LIMIT.value) {
child.executeCollect().drop(offset)
} else {
child.executeTake(limit + offset).drop(offset)
}
child.executeTake(limit + offset).drop(offset)
}
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
private lazy val writeMetrics =
Expand All @@ -59,11 +55,7 @@ case class CollectLimitExec(limit: Int, offset: Int, child: SparkPlan) extends L
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics = readMetrics ++ writeMetrics
protected override def doExecute(): RDD[InternalRow] = {
val locallyLimited = if (limit == Limit.INVALID_LIMIT.value) {
child.execute()
} else {
child.execute().mapPartitionsInternal(_.take(limit + offset))
}
val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit + offset))
val shuffled = new ShuffledRowRDD(
ShuffleExchangeExec.prepareShuffleDependency(
locallyLimited,
Expand All @@ -72,11 +64,7 @@ case class CollectLimitExec(limit: Int, offset: Int, child: SparkPlan) extends L
serializer,
writeMetrics),
readMetrics)
if (limit == Limit.INVALID_LIMIT.value) {
shuffled.mapPartitionsInternal(_.drop(offset))
} else {
shuffled.mapPartitionsInternal(_.drop(offset).take(limit))
}
shuffled.mapPartitionsInternal(_.drop(offset).take(limit))
}
}

Expand Down Expand Up @@ -150,11 +138,7 @@ case class LocalLimitExec(limit: Int, child: SparkPlan, offset: Int = 0) extends
override def outputPartitioning: Partitioning = child.outputPartitioning

override def doExecute(): RDD[InternalRow] = {
if (limit == Limit.INVALID_LIMIT.value) {
child.execute()
} else {
child.execute().mapPartitions { iter => iter.take(limit + offset)}
}
child.execute().mapPartitions { iter => iter.take(limit + offset)}
}

override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
Expand All @@ -163,16 +147,12 @@ case class LocalLimitExec(limit: Int, child: SparkPlan, offset: Int = 0) extends
// operators in one query.
ctx.addMutableState(CodeGenerator.JAVA_INT, countTerm, forceInline = true, useFreshName = false)
ctx.addMutableState(CodeGenerator.JAVA_INT, skipTerm, forceInline = true, useFreshName = false)
if (limit == Limit.INVALID_LIMIT.value) {
s"${consume(ctx, input)}"
} else {
s"""
| if ($countTerm < ${limit + offset}) {
| $countTerm += 1;
| ${consume(ctx, input)}
| }
""".stripMargin
}
s"""
| if ($countTerm < ${limit + offset}) {
| $countTerm += 1;
| ${consume(ctx, input)}
| }
""".stripMargin
}
}

Expand All @@ -189,11 +169,7 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan, offset: Int = 0) extend
override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def doExecute(): RDD[InternalRow] = {
val rdd = if (limit == Limit.INVALID_LIMIT.value) {
child.execute()
} else {
child.execute().mapPartitions { iter => iter.take(limit + offset)}
}
val rdd = child.execute().mapPartitions { iter => iter.take(limit + offset)}
val skips = rdd.take(offset)
rdd.filter(!skips.contains(_))
}
Expand All @@ -204,24 +180,14 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan, offset: Int = 0) extend
// operators in one query.
ctx.addMutableState(CodeGenerator.JAVA_INT, countTerm, forceInline = true, useFreshName = false)
ctx.addMutableState(CodeGenerator.JAVA_INT, skipTerm, forceInline = true, useFreshName = false)
if (limit == Limit.INVALID_LIMIT.value) {
s"""
| if ($skipTerm < $offset) {
| $skipTerm += 1;
| } else {
| ${consume(ctx, input)}
| }
""".stripMargin
} else {
s"""
| if ($skipTerm < $offset) {
| $skipTerm += 1;
| } else if ($countTerm < $limit) {
| $countTerm += 1;
| ${consume(ctx, input)}
| }
""".stripMargin
}
s"""
| if ($skipTerm < $offset) {
| $skipTerm += 1;
| } else if ($countTerm < $limit) {
| $countTerm += 1;
| ${consume(ctx, input)}
| }
""".stripMargin
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ SELECT '' AS zero, unique1, unique2, stringu1
SELECT '' AS eleven, unique1, unique2, stringu1
FROM onek WHERE unique1 < 50
ORDER BY unique1 DESC LIMIT 20 OFFSET 39;
SET spark.sql.forceUsingOffsetWithoutLimit = true;
SELECT '' AS ten, unique1, unique2, stringu1
FROM onek
ORDER BY unique1 OFFSET 990;
SET spark.sql.forceUsingOffsetWithoutLimit = false;
-- SELECT '' AS ten, unique1, unique2, stringu1
-- FROM onek
-- ORDER BY unique1 OFFSET 990;
-- SELECT '' AS five, unique1, unique2, stringu1
-- FROM onek
-- ORDER BY unique1 OFFSET 990 LIMIT 5;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 15
-- Number of queries: 12


-- !query
Expand Down Expand Up @@ -80,41 +80,6 @@ struct<eleven:string,unique1:int,unique2:int,stringu1:string>
0 998 AAAAAA


-- !query
SET spark.sql.forceUsingOffsetWithoutLimit = true
-- !query schema
struct<key:string,value:string>
-- !query output
spark.sql.forceUsingOffsetWithoutLimit true


-- !query
SELECT '' AS ten, unique1, unique2, stringu1
FROM onek
ORDER BY unique1 OFFSET 990
-- !query schema
struct<ten:string,unique1:int,unique2:int,stringu1:string>
-- !query output
990 369 CMAAAA
991 426 DMAAAA
992 363 EMAAAA
993 661 FMAAAA
994 695 GMAAAA
995 144 HMAAAA
996 258 IMAAAA
997 21 JMAAAA
998 549 KMAAAA
999 152 LMAAAA


-- !query
SET spark.sql.forceUsingOffsetWithoutLimit = false
-- !query schema
struct<key:string,value:string>
-- !query output
spark.sql.forceUsingOffsetWithoutLimit false


-- !query
SELECT '' AS five, unique1, unique2, stringu1
FROM onek
Expand Down

0 comments on commit 1149727

Please sign in to comment.