-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21717][SQL] Decouple consume functions of physical operators in whole-stage codegen #18931
Conversation
Test build #80581 has finished for PR 18931 at commit
|
@@ -149,14 +149,65 @@ trait CodegenSupport extends SparkPlan { | |||
|
|||
ctx.freshNamePrefix = parent.variablePrefix | |||
val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) | |||
|
|||
// Under certain conditions, we can put the logic to consume the rows of this operator into |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate certain conditions
in the comment if you have time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added more comment to elaborate the idea.
Test build #80584 has finished for PR 18931 at commit
|
Test build #80589 has finished for PR 18931 at commit
|
Test build #80590 has finished for PR 18931 at commit
|
Ran with the same benchmark in #18810. Before this patch:
After this patch:
This patch can significantly improve the performance degradation when the generated function is too long to be optimized by JIT. |
Ran with Before this patch:
After this patch:
|
Test build #80594 has finished for PR 18931 at commit
|
Test build #80595 has finished for PR 18931 at commit
|
Test build #80605 has started for PR 18931 at commit |
Test build #80607 has finished for PR 18931 at commit
|
Test build #80612 has finished for PR 18931 at commit
|
retest this please. |
Test build #80616 has finished for PR 18931 at commit
|
Test build #80648 has finished for PR 18931 at commit
|
Test build #80654 has finished for PR 18931 at commit
|
// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is enabled. | ||
// 2. The parent uses all variables in output. we can't defer variable evaluation when consume | ||
// in another function. | ||
// 3. The output variables are not empty. If it's empty, we don't bother to do that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this? Logically an operator can still have complex consume method even if it doesn't have output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds correct to me, logically, although I have no clear idea about the actual operator can be.
// 2. The parent uses all variables in output. we can't defer variable evaluation when consume | ||
// in another function. | ||
// 3. The output variables are not empty. If it's empty, we don't bother to do that. | ||
// 4. We don't use row variable. The construction of row uses deferred variable evaluation. We |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what we need is inputVars
are all materialized, which can be guaranteed by requireAllOutput
and outputVars != null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to me outputVars != null
isn't necessary too. When it is null, row
can't be null. inputVars
will bind on row
columns and be evaluated before calling created method.
val consumeFunc = | ||
if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && outputVars.nonEmpty && | ||
requireAllOutput && ctx.isValidParamLength(output)) { | ||
constructDoConsumeFunction(ctx, inputVars) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should pass row
to this function, if it's non-null, we can save a projection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should create a method for generating rowVar
, so that we can use it in both consume
and constructDoConsumeFunction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false) | ||
val rowVar = ExprCode(ev.code.trim, "false", ev.value) | ||
|
||
val doConsume = ctx.freshName("doConsume") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we put the operator name in this function name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The freshName
here will add variablePrefix
before doConsume
. So it already has operator name, e.g., agg_doConsume
.
private def constructDoConsumeFunction( | ||
ctx: CodegenContext, | ||
inputVars: Seq[ExprCode]): String = { | ||
val (callingParams, arguList, inputVarsInFunc) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it's cleaner to return paramNames, paramTypes, paramVars
, then we can simply do
void $doConsume(paramTypes.zip(paramNames).map(i => i._1 + " " + i._2).mkString(", "))
and
doConsumeFuncName(paramNames.mkString(", "))
inside constructConsumeParameters
we can just create 3 mutable collections and go through variables
to fill these collections.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds cleaner. I need to change it a little because the arguments and parameters are not the same. Some variables are not able parameterized, e.g., constants or statements.
LGTM except a few comments |
Test build #86580 has finished for PR 18931 at commit
|
@@ -1263,6 +1271,8 @@ class SQLConf extends Serializable with Logging { | |||
|
|||
def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT) | |||
|
|||
def decoupleOperatorConsumeFuncs: Boolean = getConf(DECOUPLE_OPERATOR_CONSUME_FUNCTIONS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the wholeStage
prefix for such flag names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Done.
Before merging this PR, we need test cases.
|
@gatorsmile Thanks. I will address above code comments first in next commit and add some test cases in later commit. |
"physical operator into individual methods, instead of a single big method. This can be " + | ||
"used to avoid oversized function that can miss the opportunity of JIT optimization.") | ||
.booleanConf | ||
.createWithDefault(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set to true by default. If there is objection, I can change it to false.
val paramVars = mutable.ArrayBuffer[ExprCode]() | ||
|
||
if (row != null) { | ||
arguments += row |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to update ctx.isValidParamLength
to count this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably have 2 methods for calculating param length and checking param length limitation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added an extra unit for row
if needed.
// declaration. | ||
val requireAllOutput = output.forall(parent.usedInputs.contains(_)) | ||
val consumeFunc = | ||
if (SQLConf.get.wholeStageSplitConsumeFuncByOperator && requireAllOutput && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit:
val confEnabled = SQLConf.get.wholeStageSplitConsumeFuncByOperator
if (confEnabled && ...)
Test build #86615 has finished for PR 18931 at commit
|
Test build #86621 has finished for PR 18931 at commit
|
Test build #86622 has finished for PR 18931 at commit
|
LGTM, pending jenkins |
Test build #86624 has finished for PR 18931 at commit
|
retest this please. |
LGTM |
Test build #86633 has finished for PR 18931 at commit
|
thanks, merging to master/2.3! |
…n whole-stage codegen ## What changes were proposed in this pull request? It has been observed in SPARK-21603 that whole-stage codegen suffers performance degradation, if the generated functions are too long to be optimized by JIT. We basically produce a single function to incorporate generated codes from all physical operators in whole-stage. Thus, it is possibly to grow the size of generated function over a threshold that we can't have JIT optimization for it anymore. This patch is trying to decouple the logic of consuming rows in physical operators to avoid a giant function processing rows. ## How was this patch tested? Added tests. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #18931 from viirya/SPARK-21717. (cherry picked from commit d20bbc2) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
It has been observed in SPARK-21603 that whole-stage codegen suffers performance degradation, if the generated functions are too long to be optimized by JIT.
We basically produce a single function to incorporate generated codes from all physical operators in whole-stage. Thus, it is possibly to grow the size of generated function over a threshold that we can't have JIT optimization for it anymore.
This patch is trying to decouple the logic of consuming rows in physical operators to avoid a giant function processing rows.
How was this patch tested?
Added tests.