Skip to content

Commit

Permalink
[SPARK-14138] [SQL] Fix generated SpecificColumnarIterator code can e…
Browse files Browse the repository at this point in the history
…xceed JVM size limit for cached DataFrames

## What changes were proposed in this pull request?

This PR reduces Java byte code size of method in ```SpecificColumnarIterator``` by using two approaches:
1. Generate and call ```getTYPEColumnAccessor()``` for each type, which is actually used, for instantiating accessors
2. Group a lot of method calls (more than 4000) into a method

## How was this patch tested?

Added a new unit test to ```InMemoryColumnarQuerySuite```

Here is generate code

```java
/* 033 */   private org.apache.spark.sql.execution.columnar.CachedBatch batch = null;
/* 034 */
/* 035 */   private org.apache.spark.sql.execution.columnar.IntColumnAccessor accessor;
/* 036 */   private org.apache.spark.sql.execution.columnar.IntColumnAccessor accessor1;
/* 037 */
/* 038 */   public SpecificColumnarIterator() {
/* 039 */     this.nativeOrder = ByteOrder.nativeOrder();
/* 030 */     this.mutableRow = new MutableUnsafeRow(rowWriter);
/* 041 */   }
/* 042 */
/* 043 */   public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes,
/* 044 */     boolean columnNullables[]) {
/* 044 */     this.input = input;
/* 046 */     this.columnTypes = columnTypes;
/* 047 */     this.columnIndexes = columnIndexes;
/* 048 */   }
/* 049 */
/* 050 */
/* 051 */   private org.apache.spark.sql.execution.columnar.IntColumnAccessor getIntColumnAccessor(int idx) {
/* 052 */     byte[] buffer = batch.buffers()[columnIndexes[idx]];
/* 053 */     return new org.apache.spark.sql.execution.columnar.IntColumnAccessor(ByteBuffer.wrap(buffer).order(nativeOrder));
/* 054 */   }
/* 055 */
/* 056 */
/* 057 */
/* 058 */
/* 059 */
/* 060 */
/* 061 */   public boolean hasNext() {
/* 062 */     if (currentRow < numRowsInBatch) {
/* 063 */       return true;
/* 064 */     }
/* 065 */     if (!input.hasNext()) {
/* 066 */       return false;
/* 067 */     }
/* 068 */
/* 069 */     batch = (org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
/* 070 */     currentRow = 0;
/* 071 */     numRowsInBatch = batch.numRows();
/* 072 */     accessor = getIntColumnAccessor(0);
/* 073 */     accessor1 = getIntColumnAccessor(1);
/* 074 */
/* 075 */     return hasNext();
/* 076 */   }
/* 077 */
/* 078 */   public InternalRow next() {
/* 079 */     currentRow += 1;
/* 080 */     bufferHolder.reset();
/* 081 */     rowWriter.zeroOutNullBytes();
/* 082 */     accessor.extractTo(mutableRow, 0);
/* 083 */     accessor1.extractTo(mutableRow, 1);
/* 084 */     unsafeRow.setTotalSize(bufferHolder.totalSize());
/* 085 */     return unsafeRow;
/* 086 */   }
```

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #11984 from kiszk/SPARK-14138.
  • Loading branch information
kiszk authored and davies committed Mar 31, 2016
1 parent 3cc3d85 commit f12f11e
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.columnar

import scala.collection.mutable

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -88,7 +90,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
case array: ArrayType => classOf[ArrayColumnAccessor].getName
case t: MapType => classOf[MapColumnAccessor].getName
}
ctx.addMutableState(accessorCls, accessorName, s"$accessorName = null;")
ctx.addMutableState(accessorCls, accessorName, "")

val createCode = dt match {
case t if ctx.isPrimitiveType(dt) =>
Expand All @@ -97,7 +99,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));"
case other =>
s"""$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder),
(${dt.getClass.getName}) columnTypes[$index]);"""
(${dt.getClass.getName}) columnTypes[$index]);"""
}

val extract = s"$accessorName.extractTo(mutableRow, $index);"
Expand All @@ -114,6 +116,42 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
(createCode, extract + patch)
}.unzip

/*
* 200 = 6000 bytes / 30 (up to 30 bytes per one call))
* the maximum byte code size to be compiled for HotSpot is 8000.
* We should keep less than 8000
*/
val numberOfStatementsThreshold = 200
val (initializerAccessorCalls, extractorCalls) =
if (initializeAccessors.length <= numberOfStatementsThreshold) {
(initializeAccessors.mkString("\n"), extractors.mkString("\n"))
} else {
val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold)
val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold)
var groupedAccessorsLength = 0
groupedAccessorsItr.zipWithIndex.map { case (body, i) =>
groupedAccessorsLength += 1
val funcName = s"accessors$i"
val funcCode = s"""
|private void $funcName() {
| ${body.mkString("\n")}
|}
""".stripMargin
ctx.addNewFunction(funcName, funcCode)
}
groupedExtractorsItr.zipWithIndex.map { case (body, i) =>
val funcName = s"extractors$i"
val funcCode = s"""
|private void $funcName() {
| ${body.mkString("\n")}
|}
""".stripMargin
ctx.addNewFunction(funcName, funcCode)
}
((0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"),
(0 to groupedAccessorsLength - 1).map { i => s"extractors$i();" }.mkString("\n"))
}

val code = s"""
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand Down Expand Up @@ -149,8 +187,6 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
this.nativeOrder = ByteOrder.nativeOrder();
this.buffers = new byte[${columnTypes.length}][];
this.mutableRow = new MutableUnsafeRow(rowWriter);

${initMutableStates(ctx)}
}

public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
Expand All @@ -159,6 +195,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
this.columnIndexes = columnIndexes;
}

${declareAddedFunctions(ctx)}

public boolean hasNext() {
if (currentRow < numRowsInBatch) {
return true;
Expand All @@ -173,7 +211,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
for (int i = 0; i < columnIndexes.length; i ++) {
buffers[i] = batch.buffers()[columnIndexes[i]];
}
${initializeAccessors.mkString("\n")}
${initializerAccessorCalls}

return hasNext();
}
Expand All @@ -182,7 +220,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
currentRow += 1;
bufferHolder.reset();
rowWriter.initialize(bufferHolder, $numFields);
${extractors.mkString("\n")}
${extractorCalls}
unsafeRow.pointTo(bufferHolder.buffer, $numFields, bufferHolder.totalSize());
return unsafeRow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,14 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
assert(data.count() === 10)
assert(data.filter($"s" === "3").count() === 1)
}

test("SPARK-14138: Generated SpecificColumnarIterator can exceed JVM size limit for cached DF") {
val length1 = 3999
val columnTypes1 = List.fill(length1)(IntegerType)
val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1)

val length2 = 10000
val columnTypes2 = List.fill(length2)(IntegerType)
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
}
}

0 comments on commit f12f11e

Please sign in to comment.