Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
kiszk committed Dec 20, 2017
1 parent ef10f45 commit 53661eb
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class CodegenContext {

/**
* Returns the reference of next available slot in current compacted array. The size of each
* compacted array is controlled by the config `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`.
* compacted array is controlled by the constant `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`.
* Once reaching the threshold, new compacted array is created.
*/
def getNextSlot(): String = {
Expand Down Expand Up @@ -299,7 +299,7 @@ class CodegenContext {
def initMutableStates(): String = {
// It's possible that we add same mutable state twice, e.g. the `mergeExpressions` in
// `TypedAggregateExpression`, we should call `distinct` here to remove the duplicated ones.
val initCodes = mutableStateInitCode.distinct
val initCodes = mutableStateInitCode.distinct.map(_ + "\n")

// The generated initialization code may exceed 64kb function size limit in JVM if there are too
// many mutable states, so split it into multiple functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ case class Like(left: Expression, right: Expression) extends StringRegexExpressi
if (rVal != null) {
val regexStr =
StringEscapeUtils.escapeJava(escape(rVal.asInstanceOf[UTF8String].toString()))
// inline mutable state since not many Like operations in a task
val pattern = ctx.addMutableState(patternClass, "patternLike",
v => s"""$v = ${patternClass}.compile("$regexStr");""", forceInline = true)
v => s"""$v = ${patternClass}.compile("$regexStr");""")

// We don't use nullSafeCodeGen here because we don't want to re-evaluate right again.
val eval = left.genCode(ctx)
Expand Down Expand Up @@ -194,9 +193,8 @@ case class RLike(left: Expression, right: Expression) extends StringRegexExpress
if (rVal != null) {
val regexStr =
StringEscapeUtils.escapeJava(rVal.asInstanceOf[UTF8String].toString())
// inline mutable state since not many RLike operations in a task
val pattern = ctx.addMutableState(patternClass, "patternRLike",
v => s"""$v = ${patternClass}.compile("$regexStr");""", forceInline = true)
v => s"""$v = ${patternClass}.compile("$regexStr");""")

// We don't use nullSafeCodeGen here because we don't want to re-evaluate right again.
val eval = left.genCode(ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp

override def doProduce(ctx: CodegenContext): String = {
// Right now, InputAdapter is only used when there is one input RDD.
// inline mutable state since an inputAdaptor in a task
// inline mutable state since an InputAdapter in a task
val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];",
forceInline = true)
val row = ctx.freshName("row")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,20 +587,24 @@ case class HashAggregateExec(
fastHashMapClassName, groupingKeySchema, bufferSchema).generate()
ctx.addInnerClass(generatedMap)

// inline mutable state since not many aggregation operations in a task
fastHashMapTerm = ctx.addMutableState(fastHashMapClassName, "vectorizedHastHashMap",
v => s"$v = new $fastHashMapClassName();")
ctx.addMutableState(s"java.util.Iterator<InternalRow>", "vectorizedFastHashMapIter")
v => s"$v = new $fastHashMapClassName();", forceInline = true)
ctx.addMutableState(s"java.util.Iterator<InternalRow>", "vectorizedFastHashMapIter",
forceInline = true)
} else {
val generatedMap = new RowBasedHashMapGenerator(ctx, aggregateExpressions,
fastHashMapClassName, groupingKeySchema, bufferSchema).generate()
ctx.addInnerClass(generatedMap)

// inline mutable state since not many aggregation operations in a task
fastHashMapTerm = ctx.addMutableState(fastHashMapClassName, "fastHashMap",
v => s"$v = new $fastHashMapClassName(" +
s"$thisPlan.getTaskMemoryManager(), $thisPlan.getEmptyAggregationBuffer());")
s"$thisPlan.getTaskMemoryManager(), $thisPlan.getEmptyAggregationBuffer());",
forceInline = true)
ctx.addMutableState(
"org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow>",
"fastHashMapIter")
"fastHashMapIter", forceInline = true)
}
}

Expand All @@ -611,7 +615,7 @@ case class HashAggregateExec(
// create hashMap
val hashMapClassName = classOf[UnsafeFixedWidthAggregationMap].getName
hashMapTerm = ctx.addMutableState(hashMapClassName, "hashMap",
v => s"$v = $thisPlan.createHashMap();")
v => s"$v = $thisPlan.createHashMap();", forceInline = true)
sorterTerm = ctx.addMutableState(classOf[UnsafeKVExternalSorter].getName, "sorter",
forceInline = true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,9 @@ case class SortMergeJoinExec(
val spillThreshold = getSpillThreshold
val inMemoryThreshold = getInMemoryThreshold

// inline mutable state since not many join operations in a task
val matches = ctx.addMutableState(clsName, "matches",
v => s"$v = new $clsName($inMemoryThreshold, $spillThreshold);")
v => s"$v = new $clsName($inMemoryThreshold, $spillThreshold);", forceInline = true)
// Copy the left keys as class members so they could be used in next function call.
val matchedKeyVars = copyKeys(ctx, leftKeyVars)

Expand Down

0 comments on commit 53661eb

Please sign in to comment.