Skip to content

Commit

Permalink
address review comment
Browse files Browse the repository at this point in the history
  • Loading branch information
kiszk committed Dec 27, 2017
1 parent 05da9d7 commit 4ef81c8
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ case class SortExec(
// Initialize the class member variables. This includes the instance of the Sorter and
// the iterator to return sorted rows.
val thisPlan = ctx.addReferenceObj("plan", this)
// inline mutable state since not many Sort operations in a task
// Inline mutable state since not many Sort operations in a task
sorterVariable = ctx.addMutableState(classOf[UnsafeExternalRowSorter].getName, "sorter",
v => s"$v = $thisPlan.createSorter();", forceInline = true)
val metrics = ctx.addMutableState(classOf[TaskMetrics].getName, "metrics",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp

override def doProduce(ctx: CodegenContext): String = {
// Right now, InputAdapter is only used when there is one input RDD.
// inline mutable state since an InputAdapter is used once in a task for WholeStageCodegen
// Inline mutable state since an InputAdapter is used once in a task for WholeStageCodegen
val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];",
forceInline = true)
val row = ctx.freshName("row")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ case class HashAggregateExec(
fastHashMapClassName, groupingKeySchema, bufferSchema).generate()
ctx.addInnerClass(generatedMap)

// inline mutable state since not many aggregation operations in a task
// Inline mutable state since not many aggregation operations in a task
fastHashMapTerm = ctx.addMutableState(fastHashMapClassName, "vectorizedHastHashMap",
v => s"$v = new $fastHashMapClassName();", forceInline = true)
ctx.addMutableState(s"java.util.Iterator<InternalRow>", "vectorizedFastHashMapIter",
Expand All @@ -597,7 +597,7 @@ case class HashAggregateExec(
fastHashMapClassName, groupingKeySchema, bufferSchema).generate()
ctx.addInnerClass(generatedMap)

// inline mutable state since not many aggregation operations in a task
// Inline mutable state since not many aggregation operations in a task
fastHashMapTerm = ctx.addMutableState(fastHashMapClassName, "fastHashMap",
v => s"$v = new $fastHashMapClassName(" +
s"$thisPlan.getTaskMemoryManager(), $thisPlan.getEmptyAggregationBuffer());",
Expand All @@ -609,7 +609,7 @@ case class HashAggregateExec(
}

// Create a name for the iterator from the regular hash map.
// inline mutable state since not many aggregation operations in a task
// Inline mutable state since not many aggregation operations in a task
val iterTerm = ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName,
"mapIter", forceInline = true)
// create hashMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ case class SampleExec(
val samplerClass = classOf[PoissonSampler[UnsafeRow]].getName
val initSampler = ctx.freshName("initSampler")

// inline mutable state since not many Sample operations in a task
// Inline mutable state since not many Sample operations in a task
val sampler = ctx.addMutableState(s"$samplerClass<UnsafeRow>", "sampleReplace",
v => {
val initSamplerFuncName = ctx.addNewFunction(initSampler,
Expand Down Expand Up @@ -371,7 +371,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val ev = ExprCode("", "false", value)
val BigInt = classOf[java.math.BigInteger].getName

// inline mutable state since not many Range operations in a task
// Inline mutable state since not many Range operations in a task
val taskContext = ctx.addMutableState("TaskContext", "taskContext",
v => s"$v = TaskContext.get();", forceInline = true)
val inputMetrics = ctx.addMutableState("InputMetrics", "inputMetrics",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ case class BroadcastHashJoinExec(
// At the end of the task, we update the avg hash probe.
val avgHashProbe = metricTerm(ctx, "avgHashProbe")

// inline mutable state since not many join operations in a task
// Inline mutable state since not many join operations in a task
val relationTerm = ctx.addMutableState(clsName, "relation",
v => s"""
| $v = (($clsName) $broadcast.value()).asReadOnlyCopy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ case class SortMergeJoinExec(
*/
private def genScanner(ctx: CodegenContext): (String, String) = {
// Create class member for next row from both sides.
// inline mutable state since not many join operations in a task
// Inline mutable state since not many join operations in a task
val leftRow = ctx.addMutableState("InternalRow", "leftRow", forceInline = true)
val rightRow = ctx.addMutableState("InternalRow", "rightRow", forceInline = true)

Expand All @@ -440,7 +440,7 @@ case class SortMergeJoinExec(
val spillThreshold = getSpillThreshold
val inMemoryThreshold = getInMemoryThreshold

// inline mutable state since not many join operations in a task
// Inline mutable state since not many join operations in a task
val matches = ctx.addMutableState(clsName, "matches",
v => s"$v = new $clsName($inMemoryThreshold, $spillThreshold);", forceInline = true)
// Copy the left keys as class members so they could be used in next function call.
Expand Down Expand Up @@ -577,7 +577,7 @@ case class SortMergeJoinExec(
override def needCopyResult: Boolean = true

override def doProduce(ctx: CodegenContext): String = {
// inline mutable state since not many join operations in a task
// Inline mutable state since not many join operations in a task
val leftInput = ctx.addMutableState("scala.collection.Iterator", "leftInput",
v => s"$v = inputs[0];", forceInline = true)
val rightInput = ctx.addMutableState("scala.collection.Iterator", "rightInput",
Expand Down

0 comments on commit 4ef81c8

Please sign in to comment.