Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18016][SQL][FOLLOW-UP] Code Generation: Constant Pool Limit - reduce entries for mutable state #20036

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class CodegenContext {

/**
* Returns the reference of next available slot in current compacted array. The size of each
* compacted array is controlled by the config `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`.
* compacted array is controlled by the constant `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`.
* Once reaching the threshold, new compacted array is created.
*/
def getNextSlot(): String = {
Expand Down Expand Up @@ -299,7 +299,7 @@ class CodegenContext {
def initMutableStates(): String = {
// It's possible that we add same mutable state twice, e.g. the `mergeExpressions` in
// `TypedAggregateExpression`, we should call `distinct` here to remove the duplicated ones.
val initCodes = mutableStateInitCode.distinct
val initCodes = mutableStateInitCode.distinct.map(_ + "\n")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, good catch!


// The generated initialization code may exceed 64kb function size limit in JVM if there are too
// many mutable states, so split it into multiple functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ case class Like(left: Expression, right: Expression) extends StringRegexExpressi
if (rVal != null) {
val regexStr =
StringEscapeUtils.escapeJava(escape(rVal.asInstanceOf[UTF8String].toString()))
// inline mutable state since not many Like operations in a task
val pattern = ctx.addMutableState(patternClass, "patternLike",
v => s"""$v = ${patternClass}.compile("$regexStr");""", forceInline = true)
v => s"""$v = ${patternClass}.compile("$regexStr");""")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can remove { and } around patternClass.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done for other places, too.


// We don't use nullSafeCodeGen here because we don't want to re-evaluate right again.
val eval = left.genCode(ctx)
Expand Down Expand Up @@ -194,9 +193,8 @@ case class RLike(left: Expression, right: Expression) extends StringRegexExpress
if (rVal != null) {
val regexStr =
StringEscapeUtils.escapeJava(rVal.asInstanceOf[UTF8String].toString())
// inline mutable state since not many RLike operations in a task
val pattern = ctx.addMutableState(patternClass, "patternRLike",
v => s"""$v = ${patternClass}.compile("$regexStr");""", forceInline = true)
v => s"""$v = ${patternClass}.compile("$regexStr");""")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.


// We don't use nullSafeCodeGen here because we don't want to re-evaluate right again.
val eval = left.genCode(ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp

override def doProduce(ctx: CodegenContext): String = {
// Right now, InputAdapter is only used when there is one input RDD.
// inline mutable state since an inputAdaptor in a task
// inline mutable state since an InputAdapter in a task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you miss some words...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, done

val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];",
forceInline = true)
val row = ctx.freshName("row")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,20 +587,24 @@ case class HashAggregateExec(
fastHashMapClassName, groupingKeySchema, bufferSchema).generate()
ctx.addInnerClass(generatedMap)

// inline mutable state since not many aggregation operations in a task
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Inline

fastHashMapTerm = ctx.addMutableState(fastHashMapClassName, "vectorizedHastHashMap",
v => s"$v = new $fastHashMapClassName();")
ctx.addMutableState(s"java.util.Iterator<InternalRow>", "vectorizedFastHashMapIter")
v => s"$v = new $fastHashMapClassName();", forceInline = true)
ctx.addMutableState(s"java.util.Iterator<InternalRow>", "vectorizedFastHashMapIter",
forceInline = true)
} else {
val generatedMap = new RowBasedHashMapGenerator(ctx, aggregateExpressions,
fastHashMapClassName, groupingKeySchema, bufferSchema).generate()
ctx.addInnerClass(generatedMap)

// inline mutable state since not many aggregation operations in a task
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Inline

fastHashMapTerm = ctx.addMutableState(fastHashMapClassName, "fastHashMap",
v => s"$v = new $fastHashMapClassName(" +
s"$thisPlan.getTaskMemoryManager(), $thisPlan.getEmptyAggregationBuffer());")
s"$thisPlan.getTaskMemoryManager(), $thisPlan.getEmptyAggregationBuffer());",
forceInline = true)
ctx.addMutableState(
"org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow>",
"fastHashMapIter")
"fastHashMapIter", forceInline = true)
}
}

Expand All @@ -611,7 +615,7 @@ case class HashAggregateExec(
// create hashMap
val hashMapClassName = classOf[UnsafeFixedWidthAggregationMap].getName
hashMapTerm = ctx.addMutableState(hashMapClassName, "hashMap",
v => s"$v = $thisPlan.createHashMap();")
v => s"$v = $thisPlan.createHashMap();", forceInline = true)
sorterTerm = ctx.addMutableState(classOf[UnsafeKVExternalSorter].getName, "sorter",
forceInline = true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,9 @@ case class SortMergeJoinExec(
val spillThreshold = getSpillThreshold
val inMemoryThreshold = getInMemoryThreshold

// inline mutable state since not many join operations in a task
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Inline

val matches = ctx.addMutableState(clsName, "matches",
v => s"$v = new $clsName($inMemoryThreshold, $spillThreshold);")
v => s"$v = new $clsName($inMemoryThreshold, $spillThreshold);", forceInline = true)
// Copy the left keys as class members so they could be used in next function call.
val matchedKeyVars = copyKeys(ctx, leftKeyVars)

Expand Down