-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18016][SQL] Code Generation: Constant Pool Limit - reduce entries for mutable state #19811
Changes from 23 commits
d441794
870d106
24d7087
3eb5842
074d711
eafa3f8
90d15f3
c456c07
9ca5ab3
5b36c61
fd51d75
effe918
9df109c
634d494
d3438fd
f4f3754
f1e1fca
0937ef2
4bfcc1a
49119a9
24f49c5
15e967e
d6c1a97
a9d40e9
31914c0
0e45c19
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -138,21 +138,50 @@ class CodegenContext { | |
|
||
/** | ||
* Holding expressions' mutable states like `MonotonicallyIncreasingID.count` as a | ||
* 3-tuple: java type, variable name, code to init it. | ||
* As an example, ("int", "count", "count = 0;") will produce code: | ||
* 2-tuple: java type, variable name. | ||
* As an example, ("int", "count") will produce code: | ||
* {{{ | ||
* private int count; | ||
* }}} | ||
* as a member variable, and add | ||
* {{{ | ||
* count = 0; | ||
* }}} | ||
* to the constructor. | ||
* as a member variable | ||
* | ||
* They will be kept as member variables in generated classes like `SpecificProjection`. | ||
*/ | ||
val mutableStates: mutable.ArrayBuffer[(String, String, String)] = | ||
mutable.ArrayBuffer.empty[(String, String, String)] | ||
val mutableStates: mutable.ArrayBuffer[(String, String)] = | ||
mutable.ArrayBuffer.empty[(String, String)] | ||
|
||
// An map keyed by mutable states' types holds the status of mutableStateArray | ||
val mutableStateArrayMap: mutable.Map[String, MutableStateArrays] = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
mutable.Map.empty[String, MutableStateArrays] | ||
|
||
// An array holds the code that will initialize each state | ||
val mutableStateInitCodes: mutable.ArrayBuffer[String] = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: since code is uncountable, maybe we can rename to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right in English. We are seeing some There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it is a big deal, but I remember a note by @gatorsmile who advised not to use it anymore. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea let's use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I would appreciate it if you put the link to a note |
||
mutable.ArrayBuffer.empty[String] | ||
|
||
// Holding names and current index of mutableStateArrays for a certain type | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment is too simple. Let's explain it more. |
||
class MutableStateArrays { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a java type string into this as parameter. So we can let the array name with type info? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to? the generated code looks like
So the type info is already there. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh, right. |
||
val arrayNames = mutable.ListBuffer.empty[String] | ||
createNewArray() | ||
|
||
private[this] var currentIndex = 0 | ||
|
||
private def createNewArray() = arrayNames.append(freshName("mutableStateArray")) | ||
|
||
def getCurrentIndex: Int = currentIndex | ||
|
||
def getNextSlot(): String = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. /**
* Returns the reference of next available slot in current compacted array. The size of each compacted array
* is controlled by the config `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`. Once reaching the
* threshold, new compacted array is created.
*/ |
||
if (currentIndex < CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT) { | ||
val res = s"${arrayNames.last}[$currentIndex]" | ||
currentIndex += 1 | ||
res | ||
} else { | ||
createNewArray() | ||
currentIndex = 1 | ||
s"${arrayNames.last}[0]" | ||
} | ||
} | ||
|
||
} | ||
|
||
/** | ||
* Add a mutable state as a field to the generated class. c.f. the comments above. | ||
|
@@ -163,11 +192,49 @@ class CodegenContext { | |
* the list of default imports available. | ||
* Also, generic type arguments are accepted but ignored. | ||
* @param variableName Name of the field. | ||
* @param initCode The statement(s) to put into the init() method to initialize this field. | ||
* @param initFunc Function includes statement(s) to put into the init() method to initialize | ||
* this field. The argument is the name of the mutable state variable. | ||
* If left blank, the field will be default-initialized. | ||
* @param forceInline whether the declaration and initialization code may be inlined rather than | ||
* compacted. Please set `true` into forceInline, if you want to access the | ||
* status fast (e.g. frequently accessed) or if you want to use the original | ||
* variable name | ||
* @param useFreshName If false and inline is true, the name is not changed | ||
* @return the name of the mutable state variable, which is either the original name if the | ||
* variable is inlined to the outer class, or an array access if the variable is to be | ||
* stored in an array of variables of the same type and initialization. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
* There are two use cases. One is to use the original name for global variable instead | ||
* of fresh name. Second is to use the original initialization statement since it is | ||
* complex (e.g. allocate multi-dimensional array or object constructor has varibles). | ||
* Primitive type variables will be inlined into outer class when the total number of | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some sentences here are not related to |
||
* mutable variables is less than `CodeGenerator.OUTER_CLASS_VARIABLES_THRESHOLD` | ||
* the max size of an array for compaction is given by | ||
* `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`. | ||
*/ | ||
def addMutableState(javaType: String, variableName: String, initCode: String = ""): Unit = { | ||
mutableStates += ((javaType, variableName, initCode)) | ||
def addMutableState( | ||
javaType: String, | ||
variableName: String, | ||
initFunc: String => String = _ => "", | ||
forceInline: Boolean = false, | ||
useFreshName: Boolean = true): String = { | ||
val varName = if (useFreshName) freshName(variableName) else variableName | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since I noticed that most of caller sides executes Do we need redundant code at caller side? WDYT? @cloud-fan There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isn't it an existing problem? Let's fix it in another PR to make this PR more consistent with the previous code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, let us discuss in another PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this can be moved in the if for clarity There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch |
||
|
||
// want to put a primitive type variable at outerClass for performance | ||
val canInlinePrimitive = isPrimitiveType(javaType) && | ||
(mutableStates.length < CodeGenerator.OUTER_CLASS_VARIABLES_THRESHOLD) | ||
if (forceInline || canInlinePrimitive || javaType.contains("[][]")) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explain what conditions we do inline the state in comment. |
||
val initCode = initFunc(varName) | ||
mutableStates += ((javaType, varName)) | ||
mutableStateInitCodes += initCode | ||
varName | ||
} else { | ||
val arrays = mutableStateArrayMap.getOrElseUpdate(javaType, new MutableStateArrays) | ||
val element = arrays.getNextSlot() | ||
|
||
val initCode = initFunc(element) | ||
mutableStateInitCodes += initCode | ||
element | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -176,8 +243,7 @@ class CodegenContext { | |
* data types like: UTF8String, ArrayData, MapData & InternalRow. | ||
*/ | ||
def addBufferedState(dataType: DataType, variableName: String, initCode: String): ExprCode = { | ||
val value = freshName(variableName) | ||
addMutableState(javaType(dataType), value, "") | ||
val value = addMutableState(javaType(dataType), variableName) | ||
val code = dataType match { | ||
case StringType => s"$value = $initCode.clone();" | ||
case _: StructType | _: ArrayType | _: MapType => s"$value = $initCode.copy();" | ||
|
@@ -189,15 +255,37 @@ class CodegenContext { | |
def declareMutableStates(): String = { | ||
// It's possible that we add same mutable state twice, e.g. the `mergeExpressions` in | ||
// `TypedAggregateExpression`, we should call `distinct` here to remove the duplicated ones. | ||
mutableStates.distinct.map { case (javaType, variableName, _) => | ||
val inlinedStates = mutableStates.distinct.map { case (javaType, variableName) => | ||
s"private $javaType $variableName;" | ||
}.mkString("\n") | ||
} | ||
|
||
val arrayStates = mutableStateArrayMap.flatMap { case (javaType, mutableStateArrays) => | ||
val numArrays = mutableStateArrays.arrayNames.size | ||
mutableStateArrays.arrayNames.zipWithIndex.map { case (arrayName, index) => | ||
val length = if (index + 1 == numArrays) { | ||
mutableStateArrays.getCurrentIndex | ||
} else { | ||
CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why can't we always use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. one type can have multiple arrays if there are many global variables of this type. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry, stupid question. |
||
} | ||
if (javaType.contains("[]")) { | ||
// initializer had an one-dimensional array variable | ||
val baseType = javaType.substring(0, javaType.length - 2) | ||
s"private $javaType[] $arrayName = new $baseType[$length][];" | ||
} else { | ||
// initializer had a scalar variable | ||
s"private $javaType[] $arrayName = new $javaType[$length];" | ||
} | ||
} | ||
} | ||
|
||
(inlinedStates ++ arrayStates).mkString("\n") | ||
} | ||
|
||
def initMutableStates(): String = { | ||
// It's possible that we add same mutable state twice, e.g. the `mergeExpressions` in | ||
// `TypedAggregateExpression`, we should call `distinct` here to remove the duplicated ones. | ||
val initCodes = mutableStates.distinct.map(_._3 + "\n") | ||
val initCodes = mutableStateInitCodes.distinct | ||
|
||
// The generated initialization code may exceed 64kb function size limit in JVM if there are too | ||
// many mutable states, so split it into multiple functions. | ||
splitExpressions(expressions = initCodes, funcName = "init", arguments = Nil) | ||
|
@@ -1039,9 +1127,8 @@ class CodegenContext { | |
// 2. Less code. | ||
// Currently, we will do this for all non-leaf only expression trees (i.e. expr trees with | ||
// at least two nodes) as the cost of doing it is expected to be low. | ||
addMutableState(JAVA_BOOLEAN, isNull, s"$isNull = false;") | ||
addMutableState(javaType(expr.dataType), value, | ||
s"$value = ${defaultValue(expr.dataType)};") | ||
addMutableState(JAVA_BOOLEAN, isNull, forceInline = true, useFreshName = false) | ||
addMutableState(javaType(expr.dataType), value, forceInline = true, useFreshName = false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we do
at the beginning? |
||
|
||
subexprFunctions += s"${addNewFunction(fnName, fn)}($INPUT_ROW);" | ||
val state = SubExprEliminationState(isNull, value) | ||
|
@@ -1165,6 +1252,15 @@ object CodeGenerator extends Logging { | |
// class. | ||
val GENERATED_CLASS_SIZE_THRESHOLD = 1000000 | ||
|
||
// This is the threshold for the number of global variables, whose types are primitive type or | ||
// complex type (e.g. more than one-dimensional array), that will be placed at the outer class | ||
val OUTER_CLASS_VARIABLES_THRESHOLD = 10000 | ||
|
||
// This is the maximum number of array elements to keep global variables in one Java array | ||
// 32767 is the maximum integer value that does not require a constant pool entry in a Java | ||
// bytecode instruction | ||
val MUTABLESTATEARRAY_SIZE_LIMIT = 32768 | ||
|
||
/** | ||
* Compile the Java source code into a Java class, using Janino. | ||
* | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,41 +57,35 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP | |
case _ => true | ||
}.unzip | ||
val exprVals = ctx.generateExpressions(validExpr, useSubexprElimination) | ||
val projectionCodes = exprVals.zip(index).map { | ||
val projectionCodes: Seq[(String, String, String, Int)] = exprVals.zip(index).map { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a comment to explain this tuple-4 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why needs to return column index? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. |
||
case (ev, i) => | ||
val e = expressions(i) | ||
val value = ctx.addMutableState(ctx.javaType(e.dataType), "value") | ||
if (e.nullable) { | ||
val isNull = s"isNull_$i" | ||
val value = s"value_$i" | ||
ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull, s"$isNull = true;") | ||
ctx.addMutableState(ctx.javaType(e.dataType), value, | ||
s"$value = ${ctx.defaultValue(e.dataType)};") | ||
s""" | ||
${ev.code} | ||
$isNull = ${ev.isNull}; | ||
$value = ${ev.value}; | ||
""" | ||
val isNull = ctx.addMutableState(ctx.JAVA_BOOLEAN, "isNull") | ||
(s""" | ||
|${ev.code} | ||
|$isNull = ${ev.isNull}; | ||
|$value = ${ev.value}; | ||
""".stripMargin, isNull, value, i) | ||
} else { | ||
val value = s"value_$i" | ||
ctx.addMutableState(ctx.javaType(e.dataType), value, | ||
s"$value = ${ctx.defaultValue(e.dataType)};") | ||
s""" | ||
${ev.code} | ||
$value = ${ev.value}; | ||
""" | ||
(s""" | ||
|${ev.code} | ||
|$value = ${ev.value}; | ||
""".stripMargin, ev.isNull, value, i) | ||
} | ||
} | ||
|
||
// Evaluate all the subexpressions. | ||
val evalSubexpr = ctx.subexprFunctions.mkString("\n") | ||
|
||
val updates = validExpr.zip(index).map { | ||
case (e, i) => | ||
val ev = ExprCode("", s"isNull_$i", s"value_$i") | ||
val updates = validExpr.zip(projectionCodes).map { | ||
case (e, (_, isNull, value, i)) => | ||
val ev = ExprCode("", isNull, value) | ||
ctx.updateColumn("mutableRow", e.dataType, i, ev, e.nullable) | ||
} | ||
|
||
val allProjections = ctx.splitExpressionsWithCurrentInputs(projectionCodes) | ||
val allProjections = ctx.splitExpressionsWithCurrentInputs(projectionCodes.map(_._1)) | ||
val allUpdates = ctx.splitExpressionsWithCurrentInputs(updates) | ||
|
||
val codeBody = s""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename it to
inlinedMutableStates
?