Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18016][SQL] Code Generation: Constant Pool Limit - reduce entries for mutable state #19811

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d441794
update API of addMutableState
kiszk Nov 24, 2017
870d106
eliminate initialization with default value
kiszk Nov 25, 2017
24d7087
allocate a global Java array to store a lot of mutable state in a class
kiszk Nov 26, 2017
3eb5842
fix scala style error
kiszk Nov 26, 2017
074d711
fix test failure of ExpressionEncoderSuite.NestedArray
kiszk Nov 27, 2017
eafa3f8
rebase with master
kiszk Nov 30, 2017
90d15f3
address review comments
kiszk Nov 30, 2017
c456c07
add useFreshname parameter to addMutableState
kiszk Nov 30, 2017
9ca5ab3
rebase with master
kiszk Nov 30, 2017
5b36c61
fix test failures
kiszk Dec 1, 2017
fd51d75
drop to creat a loop for initialization
kiszk Dec 7, 2017
effe918
fix test failure
kiszk Dec 8, 2017
9df109c
update comments
kiszk Dec 8, 2017
634d494
address review comment
kiszk Dec 10, 2017
d3438fd
address review comment
kiszk Dec 12, 2017
f4f3754
address review comment
kiszk Dec 12, 2017
f1e1fca
address review comments except test case
kiszk Dec 13, 2017
0937ef2
rebase with master
kiszk Dec 13, 2017
4bfcc1a
Do not use compaction as possible for frequently-accessed variable
kiszk Dec 13, 2017
49119a9
exclude mutable state from argument list for ExpressionCodegn
kiszk Dec 13, 2017
24f49c5
fix test failures
kiszk Dec 14, 2017
15e967e
address review comments
kiszk Dec 14, 2017
d6c1a97
address review comments
kiszk Dec 14, 2017
a9d40e9
address review comments
kiszk Dec 14, 2017
31914c0
address review comments
kiszk Dec 15, 2017
0e45c19
address review comments
kiszk Dec 19, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ abstract class Expression extends TreeNode[Expression] {
// TODO: support whole stage codegen too
if (eval.code.trim.length > 1024 && ctx.INPUT_ROW != null && ctx.currentVars == null) {
val setIsNull = if (eval.isNull != "false" && eval.isNull != "true") {
val globalIsNull = ctx.freshName("globalIsNull")
ctx.addMutableState(ctx.JAVA_BOOLEAN, globalIsNull)
val globalIsNull = ctx.addMutableState(ctx.JAVA_BOOLEAN, "globalIsNull")
val localIsNull = eval.isNull
eval.isNull = globalIsNull
s"$globalIsNull = $localIsNull;"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,8 @@ case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterminis
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val countTerm = ctx.freshName("count")
val partitionMaskTerm = ctx.freshName("partitionMask")
ctx.addMutableState(ctx.JAVA_LONG, countTerm)
ctx.addMutableState(ctx.JAVA_LONG, partitionMaskTerm)
val countTerm = ctx.addMutableState(ctx.JAVA_LONG, "count")
val partitionMaskTerm = ctx.addMutableState(ctx.JAVA_LONG, "partitionMask")
ctx.addPartitionInitializationStatement(s"$countTerm = 0L;")
ctx.addPartitionInitializationStatement(s"$partitionMaskTerm = ((long) partitionIndex) << 33;")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ case class SparkPartitionID() extends LeafExpression with Nondeterministic {
override protected def evalInternal(input: InternalRow): Int = partitionId

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val idTerm = ctx.freshName("partitionId")
ctx.addMutableState(ctx.JAVA_INT, idTerm)
val idTerm = ctx.addMutableState(ctx.JAVA_INT, "partitionId")
ctx.addPartitionInitializationStatement(s"$idTerm = partitionIndex;")
ev.copy(code = s"final ${ctx.javaType(dataType)} ${ev.value} = $idTerm;", isNull = "false")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,7 @@ case class Least(children: Seq[Expression]) extends Expression {

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val evalChildren = children.map(_.genCode(ctx))
val tmpIsNull = ctx.freshName("leastTmpIsNull")
ctx.addMutableState(ctx.JAVA_BOOLEAN, tmpIsNull)
val tmpIsNull = ctx.addMutableState(ctx.JAVA_BOOLEAN, "leastTmpIsNull")
val evals = evalChildren.map(eval =>
s"""
|${eval.code}
Expand Down Expand Up @@ -683,8 +682,7 @@ case class Greatest(children: Seq[Expression]) extends Expression {

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val evalChildren = children.map(_.genCode(ctx))
val tmpIsNull = ctx.freshName("greatestTmpIsNull")
ctx.addMutableState(ctx.JAVA_BOOLEAN, tmpIsNull)
val tmpIsNull = ctx.addMutableState(ctx.JAVA_BOOLEAN, "greatestTmpIsNull")
val evals = evalChildren.map(eval =>
s"""
|${eval.code}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,22 +137,63 @@ class CodegenContext {
var currentVars: Seq[ExprCode] = null

/**
* Holding expressions' mutable states like `MonotonicallyIncreasingID.count` as a
* 3-tuple: java type, variable name, code to init it.
* As an example, ("int", "count", "count = 0;") will produce code:
* Holding expressions' inlined mutable states like `MonotonicallyIncreasingID.count` as a
* 2-tuple: java type, variable name.
* As an example, ("int", "count") will produce code:
* {{{
* private int count;
* }}}
* as a member variable, and add
* {{{
* count = 0;
* }}}
* to the constructor.
* as a member variable
*
* They will be kept as member variables in generated classes like `SpecificProjection`.
*/
val mutableStates: mutable.ArrayBuffer[(String, String, String)] =
mutable.ArrayBuffer.empty[(String, String, String)]
val inlinedMutableStates: mutable.ArrayBuffer[(String, String)] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment explaining this stores only mutable states inlined in outer class.

mutable.ArrayBuffer.empty[(String, String)]

/**
* The mapping between mutable state types and corrseponding compacted arrays.
* The keys are java type string. The values are [[MutableStateArrays]] which encapsulates
* the compacted arrays for the mutable states with the same java type.
*/
val arrayCompactedMutableStates: mutable.Map[String, MutableStateArrays] =
mutable.Map.empty[String, MutableStateArrays]

// An array holds the code that will initialize each state
val mutableStateInitCode: mutable.ArrayBuffer[String] =
mutable.ArrayBuffer.empty[String]

/**
* This class holds a set of names of mutableStateArrays that is used for compacting mutable
* states for a certain type, and holds the next available slot of the current compacted array.
*/
class MutableStateArrays {
val arrayNames = mutable.ListBuffer.empty[String]
createNewArray()

private[this] var currentIndex = 0

private def createNewArray() = arrayNames.append(freshName("mutableStateArray"))

def getCurrentIndex: Int = currentIndex

/**
* Returns the reference of next available slot in current compacted array. The size of each
* compacted array is controlled by the config `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT is not a config

* Once reaching the threshold, new compacted array is created.
*/
def getNextSlot(): String = {
if (currentIndex < CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT) {
val res = s"${arrayNames.last}[$currentIndex]"
currentIndex += 1
res
} else {
createNewArray()
currentIndex = 1
s"${arrayNames.last}[0]"
}
}

}

/**
* Add a mutable state as a field to the generated class. c.f. the comments above.
Expand All @@ -163,11 +204,52 @@ class CodegenContext {
* the list of default imports available.
* Also, generic type arguments are accepted but ignored.
* @param variableName Name of the field.
* @param initCode The statement(s) to put into the init() method to initialize this field.
* @param initFunc Function includes statement(s) to put into the init() method to initialize
* this field. The argument is the name of the mutable state variable.
* If left blank, the field will be default-initialized.
* @param forceInline whether the declaration and initialization code may be inlined rather than
* compacted. Please set `true` into forceInline for one of the followings:
* 1. use the original name of the status
* 2. expect to non-frequently generate the status
* (e.g. not much sort operators in one stage)
* @param useFreshName If this is false and the mutable state ends up inlining in the outer
* class, the name is not changed
* @return the name of the mutable state variable, which is the original name or fresh name if
* the variable is inlined to the outer class, or an array access if the variable is to
* be stored in an array of variables of the same type.
* A variable will be inlined into the outer class when one of the following conditions
* are satisfied:
* 1. forceInline is true
* 2. its type is primitive type and the total number of the inlined mutable variables
* is less than `CodeGenerator.OUTER_CLASS_VARIABLES_THRESHOLD`
* 3. its type is multi-dimensional array
* When a variable is compacted into an array, the max size of the array for compaction
* is given by `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`.
*/
def addMutableState(javaType: String, variableName: String, initCode: String = ""): Unit = {
mutableStates += ((javaType, variableName, initCode))
def addMutableState(
javaType: String,
variableName: String,
initFunc: String => String = _ => "",
forceInline: Boolean = false,
useFreshName: Boolean = true): String = {

// want to put a primitive type variable at outerClass for performance
val canInlinePrimitive = isPrimitiveType(javaType) &&
(inlinedMutableStates.length < CodeGenerator.OUTER_CLASS_VARIABLES_THRESHOLD)
if (forceInline || canInlinePrimitive || javaType.contains("[][]")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain what conditions we do inline the state in comment.

val varName = if (useFreshName) freshName(variableName) else variableName
val initCode = initFunc(varName)
inlinedMutableStates += ((javaType, varName))
mutableStateInitCode += initCode
varName
} else {
val arrays = arrayCompactedMutableStates.getOrElseUpdate(javaType, new MutableStateArrays)
val element = arrays.getNextSlot()

val initCode = initFunc(element)
mutableStateInitCode += initCode
element
}
}

/**
Expand All @@ -176,8 +258,7 @@ class CodegenContext {
* data types like: UTF8String, ArrayData, MapData & InternalRow.
*/
def addBufferedState(dataType: DataType, variableName: String, initCode: String): ExprCode = {
val value = freshName(variableName)
addMutableState(javaType(dataType), value, "")
val value = addMutableState(javaType(dataType), variableName)
val code = dataType match {
case StringType => s"$value = $initCode.clone();"
case _: StructType | _: ArrayType | _: MapType => s"$value = $initCode.copy();"
Expand All @@ -189,15 +270,37 @@ class CodegenContext {
def declareMutableStates(): String = {
// It's possible that we add same mutable state twice, e.g. the `mergeExpressions` in
// `TypedAggregateExpression`, we should call `distinct` here to remove the duplicated ones.
mutableStates.distinct.map { case (javaType, variableName, _) =>
val inlinedStates = inlinedMutableStates.distinct.map { case (javaType, variableName) =>
s"private $javaType $variableName;"
}.mkString("\n")
}

val arrayStates = arrayCompactedMutableStates.flatMap { case (javaType, mutableStateArrays) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that we add same mutable state to arrayCompactedMutableStates like inlinedMutableStates? We can't just do distinct like inlinedMutableStates.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would arrayCompactedMutableStates has duplications?

val numArrays = mutableStateArrays.arrayNames.size
mutableStateArrays.arrayNames.zipWithIndex.map { case (arrayName, index) =>
val length = if (index + 1 == numArrays) {
mutableStateArrays.getCurrentIndex
} else {
CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we always use mutableStateArrays.getCurrentIndex?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one type can have multiple arrays if there are many global variables of this type. The getCurrentIndex is the index of the array to be filled, and other arrays are already full and their size is MUTABLESTATEARRAY_SIZE_LIMIT.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, stupid question.

}
if (javaType.contains("[]")) {
// initializer had an one-dimensional array variable
val baseType = javaType.substring(0, javaType.length - 2)
s"private $javaType[] $arrayName = new $baseType[$length][];"
} else {
// initializer had a scalar variable
s"private $javaType[] $arrayName = new $javaType[$length];"
}
}
}

(inlinedStates ++ arrayStates).mkString("\n")
}

def initMutableStates(): String = {
// It's possible that we add same mutable state twice, e.g. the `mergeExpressions` in
// `TypedAggregateExpression`, we should call `distinct` here to remove the duplicated ones.
val initCodes = mutableStates.distinct.map(_._3 + "\n")
val initCodes = mutableStateInitCode.distinct

// The generated initialization code may exceed 64kb function size limit in JVM if there are too
// many mutable states, so split it into multiple functions.
splitExpressions(expressions = initCodes, funcName = "init", arguments = Nil)
Expand Down Expand Up @@ -1011,9 +1114,9 @@ class CodegenContext {
val commonExprs = equivalentExpressions.getAllEquivalentExprs.filter(_.size > 1)
commonExprs.foreach { e =>
val expr = e.head
val fnName = freshName("evalExpr")
val isNull = s"${fnName}IsNull"
val value = s"${fnName}Value"
val fnName = freshName("subExpr")
val isNull = addMutableState(JAVA_BOOLEAN, "subExprIsNull")
val value = addMutableState(javaType(expr.dataType), "subExprValue")

// Generate the code for this expression tree and wrap it in a function.
val eval = expr.genCode(this)
Expand All @@ -1039,9 +1142,6 @@ class CodegenContext {
// 2. Less code.
// Currently, we will do this for all non-leaf only expression trees (i.e. expr trees with
// at least two nodes) as the cost of doing it is expected to be low.
addMutableState(JAVA_BOOLEAN, isNull, s"$isNull = false;")
addMutableState(javaType(expr.dataType), value,
s"$value = ${defaultValue(expr.dataType)};")

subexprFunctions += s"${addNewFunction(fnName, fn)}($INPUT_ROW);"
val state = SubExprEliminationState(isNull, value)
Expand Down Expand Up @@ -1165,6 +1265,15 @@ object CodeGenerator extends Logging {
// class.
val GENERATED_CLASS_SIZE_THRESHOLD = 1000000

// This is the threshold for the number of global variables, whose types are primitive type or
// complex type (e.g. more than one-dimensional array), that will be placed at the outer class
val OUTER_CLASS_VARIABLES_THRESHOLD = 10000

// This is the maximum number of array elements to keep global variables in one Java array
// 32767 is the maximum integer value that does not require a constant pool entry in a Java
// bytecode instruction
val MUTABLESTATEARRAY_SIZE_LIMIT = 32768

/**
* Compile the Java source code into a Java class, using Janino.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,41 +57,37 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
case _ => true
}.unzip
val exprVals = ctx.generateExpressions(validExpr, useSubexprElimination)
val projectionCodes = exprVals.zip(index).map {

// 4-tuples: (code for projection, isNull variable name, value variable name, column index)
val projectionCodes: Seq[(String, String, String, Int)] = exprVals.zip(index).map {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why needs to return column index?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

case (ev, i) =>
val e = expressions(i)
val value = ctx.addMutableState(ctx.javaType(e.dataType), "value")
if (e.nullable) {
val isNull = s"isNull_$i"
val value = s"value_$i"
ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull, s"$isNull = true;")
ctx.addMutableState(ctx.javaType(e.dataType), value,
s"$value = ${ctx.defaultValue(e.dataType)};")
s"""
${ev.code}
$isNull = ${ev.isNull};
$value = ${ev.value};
"""
val isNull = ctx.addMutableState(ctx.JAVA_BOOLEAN, "isNull")
(s"""
|${ev.code}
|$isNull = ${ev.isNull};
|$value = ${ev.value};
""".stripMargin, isNull, value, i)
} else {
val value = s"value_$i"
ctx.addMutableState(ctx.javaType(e.dataType), value,
s"$value = ${ctx.defaultValue(e.dataType)};")
s"""
${ev.code}
$value = ${ev.value};
"""
(s"""
|${ev.code}
|$value = ${ev.value};
""".stripMargin, ev.isNull, value, i)
}
}

// Evaluate all the subexpressions.
val evalSubexpr = ctx.subexprFunctions.mkString("\n")

val updates = validExpr.zip(index).map {
case (e, i) =>
val ev = ExprCode("", s"isNull_$i", s"value_$i")
val updates = validExpr.zip(projectionCodes).map {
case (e, (_, isNull, value, i)) =>
val ev = ExprCode("", isNull, value)
ctx.updateColumn("mutableRow", e.dataType, i, ev, e.nullable)
}

val allProjections = ctx.splitExpressionsWithCurrentInputs(projectionCodes)
val allProjections = ctx.splitExpressionsWithCurrentInputs(projectionCodes.map(_._1))
val allUpdates = ctx.splitExpressionsWithCurrentInputs(updates)

val codeBody = s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
bufferHolder: String,
isTopLevel: Boolean = false): String = {
val rowWriterClass = classOf[UnsafeRowWriter].getName
val rowWriter = ctx.freshName("rowWriter")
ctx.addMutableState(rowWriterClass, rowWriter,
s"$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")
val rowWriter = ctx.addMutableState(rowWriterClass, "rowWriter",
v => s"$v = new $rowWriterClass($bufferHolder, ${inputs.length});")

val resetWriter = if (isTopLevel) {
// For top level row writer, it always writes to the beginning of the global buffer holder,
Expand Down Expand Up @@ -186,9 +185,8 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
// Puts `input` in a local variable to avoid to re-evaluate it if it's a statement.
val tmpInput = ctx.freshName("tmpInput")
val arrayWriterClass = classOf[UnsafeArrayWriter].getName
val arrayWriter = ctx.freshName("arrayWriter")
ctx.addMutableState(arrayWriterClass, arrayWriter,
s"$arrayWriter = new $arrayWriterClass();")
val arrayWriter = ctx.addMutableState(arrayWriterClass, "arrayWriter",
v => s"$v = new $arrayWriterClass();")
val numElements = ctx.freshName("numElements")
val index = ctx.freshName("index")

Expand Down Expand Up @@ -318,13 +316,12 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
case _ => true
}

val result = ctx.freshName("result")
ctx.addMutableState("UnsafeRow", result, s"$result = new UnsafeRow(${expressions.length});")
val result = ctx.addMutableState("UnsafeRow", "result",
v => s"$v = new UnsafeRow(${expressions.length});")

val holder = ctx.freshName("holder")
val holderClass = classOf[BufferHolder].getName
ctx.addMutableState(holderClass, holder,
s"$holder = new $holderClass($result, ${numVarLenFields * 32});")
val holder = ctx.addMutableState(holderClass, "holder",
v => s"$v = new $holderClass($result, ${numVarLenFields * 32});")

val resetBufferHolder = if (numVarLenFields == 0) {
""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ case class CaseWhen(
// It is initialized to `NOT_MATCHED`, and if it's set to `HAS_NULL` or `HAS_NONNULL`,
// We won't go on anymore on the computation.
val resultState = ctx.freshName("caseWhenResultState")
val tmpResult = ctx.freshName("caseWhenTmpResult")
ctx.addMutableState(ctx.javaType(dataType), tmpResult)
val tmpResult = ctx.addMutableState(ctx.javaType(dataType), "caseWhenTmpResult")

// these blocks are meant to be inside a
// do {
Expand Down
Loading