From b6ab63359e00d7fe0175204f191ff1baa10b789f Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 17 Aug 2017 14:09:32 +0900 Subject: [PATCH] Add static util methods to create the on/off-heap vectors. --- .../VectorizedParquetRecordReader.java | 12 ++++------ .../vectorized/AggregateHashMap.java | 5 +---- .../vectorized/ColumnVectorUtils.java | 12 +++++----- .../vectorized/OffHeapColumnVector.java | 22 +++++++++++++++++++ .../vectorized/OnHeapColumnVector.java | 22 +++++++++++++++++++ .../VectorizedHashMapGenerator.scala | 8 ++----- 6 files changed, 56 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 9b1d046d4edc7..910f004592477 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -178,14 +178,10 @@ public void initBatch(MemoryMode memMode, StructType partitionColumns, } int capacity = ColumnarBatch.DEFAULT_BATCH_SIZE; - columnVectors = new MutableColumnVector[batchSchema.fields().length]; - for (int i = 0; i < batchSchema.fields().length; i++) { - StructField field = batchSchema.fields()[i]; - if (memMode == MemoryMode.OFF_HEAP) { - columnVectors[i] = new OffHeapColumnVector(capacity, field.dataType()); - } else { - columnVectors[i] = new OnHeapColumnVector(capacity, field.dataType()); - } + if (memMode == MemoryMode.OFF_HEAP) { + columnVectors = OffHeapColumnVector.allocateColumns(capacity, batchSchema); + } else { + columnVectors = OnHeapColumnVector.allocateColumns(capacity, batchSchema); } columnarBatch = new ColumnarBatch(batchSchema, columnVectors, capacity); if (partitionColumns != null) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java index f4d94ea63f414..1c94f706dc685 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java @@ -63,10 +63,7 @@ public AggregateHashMap(StructType schema, int capacity, double loadFactor, int this.maxSteps = maxSteps; numBuckets = (int) (capacity / loadFactor); - columnVectors = new OnHeapColumnVector[schema.fields().length]; - for (int i = 0; i < schema.fields().length; i++) { - columnVectors[i] = new OnHeapColumnVector(capacity, schema.fields()[i].dataType()); - } + columnVectors = OnHeapColumnVector.allocateColumns(capacity, schema); batch = new ColumnarBatch(schema, columnVectors, capacity); buckets = new int[numBuckets]; Arrays.fill(buckets, -1); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index dd4adeb50917f..7030bfd235a3e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -199,13 +199,11 @@ private static void appendValue(MutableColumnVector dst, DataType t, Row src, in public static ColumnarBatch toBatch( StructType schema, MemoryMode memMode, Iterator row) { int capacity = ColumnarBatch.DEFAULT_BATCH_SIZE; - MutableColumnVector[] columnVectors = new MutableColumnVector[schema.fields().length]; - for (int i = 0; i < schema.fields().length; i++) { - if (memMode == MemoryMode.OFF_HEAP) { - columnVectors[i] = new OffHeapColumnVector(capacity, schema.fields()[i].dataType()); - } else { - columnVectors[i] = new OnHeapColumnVector(capacity, schema.fields()[i].dataType()); - } + MutableColumnVector[] columnVectors; + if (memMode == MemoryMode.OFF_HEAP) { + columnVectors = OffHeapColumnVector.allocateColumns(capacity, schema); + } else { + columnVectors = OnHeapColumnVector.allocateColumns(capacity, schema); } int n = 0; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index d386c86a3514c..0fe5e99bb4053 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -30,6 +30,28 @@ public final class OffHeapColumnVector extends MutableColumnVector { private static final boolean bigEndianPlatform = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN); + /** + * Allocates columns to store elements of each field of the schema off heap. + * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is + * in number of elements, not number of bytes. + */ + public static OffHeapColumnVector[] allocateColumns(int capacity, StructType schema) { + return allocateColumns(capacity, schema.fields()); + } + + /** + * Allocates columns to store elements of each field off heap. + * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is + * in number of elements, not number of bytes. + */ + public static OffHeapColumnVector[] allocateColumns(int capacity, StructField[] fields) { + OffHeapColumnVector[] vectors = new OffHeapColumnVector[fields.length]; + for (int i = 0; i < fields.length; i++) { + vectors[i] = new OffHeapColumnVector(capacity, fields[i].dataType()); + } + return vectors; + } + // The data stored in these two allocations need to maintain binary compatible. We can // directly pass this buffer to external components. private long nulls; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index aab02ea34ab5d..69f6b71e67233 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -32,6 +32,28 @@ public final class OnHeapColumnVector extends MutableColumnVector { private static final boolean bigEndianPlatform = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN); + /** + * Allocates columns to store elements of each field of the schema on heap. + * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is + * in number of elements, not number of bytes. + */ + public static OnHeapColumnVector[] allocateColumns(int capacity, StructType schema) { + return allocateColumns(capacity, schema.fields()); + } + + /** + * Allocates columns to store elements of each field on heap. + * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is + * in number of elements, not number of bytes. + */ + public static OnHeapColumnVector[] allocateColumns(int capacity, StructField[] fields) { + OnHeapColumnVector[] vectors = new OnHeapColumnVector[fields.length]; + for (int i = 0; i < fields.length; i++) { + vectors[i] = new OnHeapColumnVector(capacity, fields[i].dataType()); + } + return vectors; + } + // The data stored in these arrays need to maintain binary compatible. We can // directly pass this buffer to external components. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala index a624245b757b9..13f79275cac41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala @@ -91,12 +91,8 @@ class VectorizedHashMapGenerator( | $generatedAggBufferSchema | | public $generatedClassName() { - | batchVectors = new org.apache.spark.sql.execution.vectorized - | .OnHeapColumnVector[schema.fields().length]; - | for (int i = 0; i < schema.fields().length; i++) { - | batchVectors[i] = new org.apache.spark.sql.execution.vectorized.OnHeapColumnVector( - | capacity, schema.fields()[i].dataType()); - | } + | batchVectors = org.apache.spark.sql.execution.vectorized + | .OnHeapColumnVector.allocateColumns(capacity, schema); | batch = new org.apache.spark.sql.execution.vectorized.ColumnarBatch( | schema, batchVectors, capacity); |