Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39057][SQL] Offset could work without Limit #36417

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ abstract class Optimizer(catalogManager: CatalogManager)
CollapseWindow,
CombineFilters,
EliminateLimits,
RewriteOffsets,
CombineUnions,
// Constant folding and strength reduction
OptimizeRepartition,
Expand Down Expand Up @@ -673,7 +672,7 @@ object RemoveNoopUnion extends Rule[LogicalPlan] {
}

/**
* Pushes down [[LocalLimit]] beneath UNION ALL and joins.
* Pushes down [[LocalLimit]] beneath UNION ALL, OFFSET and joins.
*/
object LimitPushDown extends Rule[LogicalPlan] {

Expand Down Expand Up @@ -750,6 +749,11 @@ object LimitPushDown extends Rule[LogicalPlan] {
Limit(le, Project(a.aggregateExpressions, LocalLimit(le, a.child)))
case Limit(le @ IntegerLiteral(1), p @ Project(_, a: Aggregate)) if a.groupOnly =>
Limit(le, p.copy(child = Project(a.aggregateExpressions, LocalLimit(le, a.child))))
// Merge offset value and limit value into LocalLimit and pushes down LocalLimit through Offset.
case LocalLimit(le, Offset(oe @ IntegerLiteral(offset), grandChild)) if offset > 0 =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the pushdown is correct even if offset == 0. We don't need if offset > 0

Offset(oe, LocalLimit(Add(le, oe), grandChild))
// Eliminate Offset if offset value equals 0.
case Offset(IntegerLiteral(0), child) => child
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a new rule EliminateOffsets for this. And there is one more idea: if child.maxRows < offset, then we can simply turn the query into a empty local relation.

}
}

Expand Down Expand Up @@ -1870,23 +1874,6 @@ object EliminateLimits extends Rule[LogicalPlan] {
}
}

/**
* Rewrite [[Offset]] by eliminate [[Offset]] or merge offset value and limit value into
* [[LocalLimit]]. See [[Limit]] for more information about the difference between
* [[LocalLimit]] and [[GlobalLimit]].
*/
object RewriteOffsets extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case localLimit @ LocalLimit(le, Offset(oe, grandChild)) =>
val offset = oe.eval().asInstanceOf[Int]
if (offset == 0) {
localLimit.withNewChildren(Seq(grandChild))
} else {
Offset(oe, LocalLimit(Add(le, oe), grandChild))
}
}
}

/**
* Check if there any cartesian products between joins of any type in the optimized plan tree.
* Throw an error if a cartesian product is found without an explicit cross join specified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
CollectLimitExec(limit, planLater(child)) :: Nil
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold =>
TakeOrderedAndProjectExec(
limit, order, child.output, planLater(child), offset) :: Nil
TakeOrderedAndProjectExec(
limit, order, child.output, planLater(child), offset) :: Nil
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset),
Project(projectList, Sort(order, true, child)))
if limit + offset < conf.topKSortFallbackThreshold =>
Expand Down Expand Up @@ -128,8 +128,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child), offset) :: Nil
case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) =>
GlobalLimitAndOffsetExec(limit, offset, planLater(child)) :: Nil
case logical.Offset(IntegerLiteral(offset), child) =>
GlobalLimitAndOffsetExec(offset = offset, child = planLater(child)) :: Nil
case _ =>
Nil
}
Expand Down Expand Up @@ -820,6 +818,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.LocalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimit(IntegerLiteral(limit), child) =>
execution.GlobalLimitExec(limit, planLater(child)) :: Nil
case logical.Offset(IntegerLiteral(offset), child) =>
GlobalLimitAndOffsetExec(offset = offset, child = planLater(child)) :: Nil
case union: logical.Union =>
execution.UnionExec(union.children.map(planLater)) :: Nil
case g @ logical.Generate(generator, _, outer, _, _, child) =>
Expand Down
62 changes: 28 additions & 34 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,14 @@ case class CollectLimitExec(limit: Int = -1, child: SparkPlan, offset: Int = 0)
val singlePartitionRDD = if (childRDD.getNumPartitions == 1) {
childRDD
} else {
val locallyLimited = if (offset > 0) {
childRDD.mapPartitionsInternal(_.take(limit + offset))
val locallyLimited = if (limit >= 0) {
if (offset > 0) {
childRDD.mapPartitionsInternal(_.take(limit + offset))
} else {
childRDD.mapPartitionsInternal(_.take(limit))
}
} else {
childRDD.mapPartitionsInternal(_.take(limit))
childRDD
}
new ShuffledRowRDD(
ShuffleExchangeExec.prepareShuffleDependency(
Expand All @@ -92,10 +96,14 @@ case class CollectLimitExec(limit: Int = -1, child: SparkPlan, offset: Int = 0)
writeMetrics),
readMetrics)
}
if (offset > 0) {
singlePartitionRDD.mapPartitionsInternal(_.drop(offset).take(limit))
if (limit >= 0) {
if (offset > 0) {
singlePartitionRDD.mapPartitionsInternal(_.drop(offset).take(limit))
} else {
singlePartitionRDD.mapPartitionsInternal(_.take(limit))
}
} else {
singlePartitionRDD.mapPartitionsInternal(_.take(limit))
singlePartitionRDD.mapPartitionsInternal(_.drop(offset))
}
}
}
Expand Down Expand Up @@ -223,53 +231,38 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
*/
case class GlobalLimitAndOffsetExec(
limit: Int = -1,
offset: Int = 0,
offset: Int,
child: SparkPlan) extends BaseLimitExec {
assert(limit >= 0 || (limit == -1 && offset > 0))
assert(offset > 0)

override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil

override def doExecute(): RDD[InternalRow] = if (limit >= 0) {
if (offset > 0) {
child.execute().mapPartitions(iter => iter.take(limit + offset).drop(offset))
} else {
child.execute().mapPartitions(iter => iter.take(limit))
}
child.execute().mapPartitions(iter => iter.take(limit + offset).drop(offset))
Copy link
Contributor

@cloud-fan cloud-fan May 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, but since we are touching here, let's use mapPartitionsInternal which is more efficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

} else {
child.execute().mapPartitions(iter => iter.drop(offset))
}

private lazy val skipTerm = BaseLimitExec.newLimitCountTerm()

override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
ctx.addMutableState(
CodeGenerator.JAVA_INT, skipTerm, forceInline = true, useFreshName = false)
if (limit >= 0) {
// The counter name is already obtained by the upstream operators via `limitNotReachedChecks`.
// Here we have to inline it to not change its name. This is fine as we won't have many limit
// operators in one query.
ctx.addMutableState(
CodeGenerator.JAVA_INT, countTerm, forceInline = true, useFreshName = false)
if (offset > 0) {
ctx.addMutableState(
CodeGenerator.JAVA_INT, skipTerm, forceInline = true, useFreshName = false)
s"""
| if ($skipTerm < $offset) {
| $skipTerm += 1;
| } else if ($countTerm < $limit) {
| $countTerm += 1;
| ${consume(ctx, input)}
| }
""".stripMargin
} else {
s"""
| if ($countTerm < $limit) {
| $countTerm += 1;
| ${consume(ctx, input)}
| }
s"""
| if ($skipTerm < $offset) {
| $skipTerm += 1;
| } else if ($countTerm < $limit) {
| $countTerm += 1;
| ${consume(ctx, input)}
| }
""".stripMargin
}
} else {
ctx.addMutableState(
CodeGenerator.JAVA_INT, skipTerm, forceInline = true, useFreshName = false)
s"""
| if ($skipTerm < $offset) {
| $skipTerm += 1;
Expand All @@ -295,7 +288,8 @@ case class TakeOrderedAndProjectExec(
limit: Int,
sortOrder: Seq[SortOrder],
projectList: Seq[NamedExpression],
child: SparkPlan, offset: Int = 0) extends UnaryExecNode {
child: SparkPlan,
offset: Int = 0) extends UnaryExecNode {

override def output: Seq[Attribute] = {
projectList.map(_.toAttribute)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ SELECT '' AS two, unique1, unique2, stringu1
SELECT '' AS three, unique1, unique2, stringu1
FROM onek WHERE unique1 > 100
ORDER BY unique1 LIMIT 3 OFFSET 20;
SELECT '' AS zero, unique1, unique2, stringu1
FROM onek WHERE unique1 < 50
ORDER BY unique1 DESC LIMIT 8 OFFSET 99;
SELECT '' AS eleven, unique1, unique2, stringu1
FROM onek WHERE unique1 < 50
ORDER BY unique1 DESC LIMIT 20 OFFSET 39;
-- SELECT '' AS ten, unique1, unique2, stringu1
-- FROM onek
-- ORDER BY unique1 OFFSET 990;
SELECT '' AS zero, unique1, unique2, stringu1
FROM onek WHERE unique1 < 50
ORDER BY unique1 DESC LIMIT 8 OFFSET 99;
SELECT '' AS eleven, unique1, unique2, stringu1
FROM onek WHERE unique1 < 50
ORDER BY unique1 DESC LIMIT 20 OFFSET 39;
SELECT '' AS ten, unique1, unique2, stringu1
FROM onek
ORDER BY unique1 OFFSET 990;
-- SELECT '' AS five, unique1, unique2, stringu1
-- FROM onek
-- ORDER BY unique1 OFFSET 990 LIMIT 5;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 12
-- Number of queries: 13


-- !query
Expand Down Expand Up @@ -52,8 +52,8 @@ struct<three:string,unique1:int,unique2:int,stringu1:string>

-- !query
SELECT '' AS zero, unique1, unique2, stringu1
FROM onek WHERE unique1 < 50
ORDER BY unique1 DESC LIMIT 8 OFFSET 99
FROM onek WHERE unique1 < 50
ORDER BY unique1 DESC LIMIT 8 OFFSET 99
-- !query schema
struct<zero:string,unique1:int,unique2:int,stringu1:string>
-- !query output
Expand All @@ -62,8 +62,8 @@ struct<zero:string,unique1:int,unique2:int,stringu1:string>

-- !query
SELECT '' AS eleven, unique1, unique2, stringu1
FROM onek WHERE unique1 < 50
ORDER BY unique1 DESC LIMIT 20 OFFSET 39
FROM onek WHERE unique1 < 50
ORDER BY unique1 DESC LIMIT 20 OFFSET 39
-- !query schema
struct<eleven:string,unique1:int,unique2:int,stringu1:string>
-- !query output
Expand All @@ -80,6 +80,25 @@ struct<eleven:string,unique1:int,unique2:int,stringu1:string>
0 998 AAAAAA


-- !query
SELECT '' AS ten, unique1, unique2, stringu1
FROM onek
ORDER BY unique1 OFFSET 990
-- !query schema
struct<ten:string,unique1:int,unique2:int,stringu1:string>
-- !query output
990 369 CMAAAA
991 426 DMAAAA
992 363 EMAAAA
993 661 FMAAAA
994 695 GMAAAA
995 144 HMAAAA
996 258 IMAAAA
997 21 JMAAAA
998 549 KMAAAA
999 152 LMAAAA


-- !query
SELECT '' AS five, unique1, unique2, stringu1
FROM onek
Expand Down