From f12f11e578169b47e3f8b18b299948c0670ba585 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 31 Mar 2016 15:05:48 -0700 Subject: [PATCH] [SPARK-14138] [SQL] Fix generated SpecificColumnarIterator code can exceed JVM size limit for cached DataFrames ## What changes were proposed in this pull request? This PR reduces Java byte code size of method in ```SpecificColumnarIterator``` by using two approaches: 1. Generate and call ```getTYPEColumnAccessor()``` for each type, which is actually used, for instantiating accessors 2. Group a lot of method calls (more than 4000) into a method ## How was this patch tested? Added a new unit test to ```InMemoryColumnarQuerySuite``` Here is generate code ```java /* 033 */ private org.apache.spark.sql.execution.columnar.CachedBatch batch = null; /* 034 */ /* 035 */ private org.apache.spark.sql.execution.columnar.IntColumnAccessor accessor; /* 036 */ private org.apache.spark.sql.execution.columnar.IntColumnAccessor accessor1; /* 037 */ /* 038 */ public SpecificColumnarIterator() { /* 039 */ this.nativeOrder = ByteOrder.nativeOrder(); /* 030 */ this.mutableRow = new MutableUnsafeRow(rowWriter); /* 041 */ } /* 042 */ /* 043 */ public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes, /* 044 */ boolean columnNullables[]) { /* 044 */ this.input = input; /* 046 */ this.columnTypes = columnTypes; /* 047 */ this.columnIndexes = columnIndexes; /* 048 */ } /* 049 */ /* 050 */ /* 051 */ private org.apache.spark.sql.execution.columnar.IntColumnAccessor getIntColumnAccessor(int idx) { /* 052 */ byte[] buffer = batch.buffers()[columnIndexes[idx]]; /* 053 */ return new org.apache.spark.sql.execution.columnar.IntColumnAccessor(ByteBuffer.wrap(buffer).order(nativeOrder)); /* 054 */ } /* 055 */ /* 056 */ /* 057 */ /* 058 */ /* 059 */ /* 060 */ /* 061 */ public boolean hasNext() { /* 062 */ if (currentRow < numRowsInBatch) { /* 063 */ return true; /* 064 */ } /* 065 */ if (!input.hasNext()) { /* 066 */ return false; /* 067 */ } /* 068 */ /* 069 */ batch = (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); /* 070 */ currentRow = 0; /* 071 */ numRowsInBatch = batch.numRows(); /* 072 */ accessor = getIntColumnAccessor(0); /* 073 */ accessor1 = getIntColumnAccessor(1); /* 074 */ /* 075 */ return hasNext(); /* 076 */ } /* 077 */ /* 078 */ public InternalRow next() { /* 079 */ currentRow += 1; /* 080 */ bufferHolder.reset(); /* 081 */ rowWriter.zeroOutNullBytes(); /* 082 */ accessor.extractTo(mutableRow, 0); /* 083 */ accessor1.extractTo(mutableRow, 1); /* 084 */ unsafeRow.setTotalSize(bufferHolder.totalSize()); /* 085 */ return unsafeRow; /* 086 */ } ``` (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: Kazuaki Ishizaki Closes #11984 from kiszk/SPARK-14138. --- .../columnar/GenerateColumnAccessor.scala | 50 ++++++++++++++++--- .../columnar/InMemoryColumnarQuerySuite.scala | 10 ++++ 2 files changed, 54 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index eaafc96e4d2e7..4d01b78c3c10f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.columnar +import scala.collection.mutable + import org.apache.spark.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -88,7 +90,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera case array: ArrayType => classOf[ArrayColumnAccessor].getName case t: MapType => classOf[MapColumnAccessor].getName } - ctx.addMutableState(accessorCls, accessorName, s"$accessorName = null;") + ctx.addMutableState(accessorCls, accessorName, "") val createCode = dt match { case t if ctx.isPrimitiveType(dt) => @@ -97,7 +99,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" case other => s"""$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder), - (${dt.getClass.getName}) columnTypes[$index]);""" + (${dt.getClass.getName}) columnTypes[$index]);""" } val extract = s"$accessorName.extractTo(mutableRow, $index);" @@ -114,6 +116,42 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera (createCode, extract + patch) }.unzip + /* + * 200 = 6000 bytes / 30 (up to 30 bytes per one call)) + * the maximum byte code size to be compiled for HotSpot is 8000. + * We should keep less than 8000 + */ + val numberOfStatementsThreshold = 200 + val (initializerAccessorCalls, extractorCalls) = + if (initializeAccessors.length <= numberOfStatementsThreshold) { + (initializeAccessors.mkString("\n"), extractors.mkString("\n")) + } else { + val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold) + val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold) + var groupedAccessorsLength = 0 + groupedAccessorsItr.zipWithIndex.map { case (body, i) => + groupedAccessorsLength += 1 + val funcName = s"accessors$i" + val funcCode = s""" + |private void $funcName() { + | ${body.mkString("\n")} + |} + """.stripMargin + ctx.addNewFunction(funcName, funcCode) + } + groupedExtractorsItr.zipWithIndex.map { case (body, i) => + val funcName = s"extractors$i" + val funcCode = s""" + |private void $funcName() { + | ${body.mkString("\n")} + |} + """.stripMargin + ctx.addNewFunction(funcName, funcCode) + } + ((0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"), + (0 to groupedAccessorsLength - 1).map { i => s"extractors$i();" }.mkString("\n")) + } + val code = s""" import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -149,8 +187,6 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera this.nativeOrder = ByteOrder.nativeOrder(); this.buffers = new byte[${columnTypes.length}][]; this.mutableRow = new MutableUnsafeRow(rowWriter); - - ${initMutableStates(ctx)} } public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) { @@ -159,6 +195,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera this.columnIndexes = columnIndexes; } + ${declareAddedFunctions(ctx)} + public boolean hasNext() { if (currentRow < numRowsInBatch) { return true; @@ -173,7 +211,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera for (int i = 0; i < columnIndexes.length; i ++) { buffers[i] = batch.buffers()[columnIndexes[i]]; } - ${initializeAccessors.mkString("\n")} + ${initializerAccessorCalls} return hasNext(); } @@ -182,7 +220,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera currentRow += 1; bufferHolder.reset(); rowWriter.initialize(bufferHolder, $numFields); - ${extractors.mkString("\n")} + ${extractorCalls} unsafeRow.pointTo(bufferHolder.buffer, $numFields, bufferHolder.totalSize()); return unsafeRow; } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 25afed25c897b..557415b801d82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -219,4 +219,14 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { assert(data.count() === 10) assert(data.filter($"s" === "3").count() === 1) } + + test("SPARK-14138: Generated SpecificColumnarIterator can exceed JVM size limit for cached DF") { + val length1 = 3999 + val columnTypes1 = List.fill(length1)(IntegerType) + val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1) + + val length2 = 10000 + val columnTypes2 = List.fill(length2)(IntegerType) + val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2) + } }