Skip to content

Commit

Permalink
Add static util methods to create the on/off-heap vectors.
Browse files Browse the repository at this point in the history
  • Loading branch information
ueshin committed Aug 17, 2017
1 parent 2769b39 commit b6ab633
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,10 @@ public void initBatch(MemoryMode memMode, StructType partitionColumns,
}

int capacity = ColumnarBatch.DEFAULT_BATCH_SIZE;
columnVectors = new MutableColumnVector[batchSchema.fields().length];
for (int i = 0; i < batchSchema.fields().length; i++) {
StructField field = batchSchema.fields()[i];
if (memMode == MemoryMode.OFF_HEAP) {
columnVectors[i] = new OffHeapColumnVector(capacity, field.dataType());
} else {
columnVectors[i] = new OnHeapColumnVector(capacity, field.dataType());
}
if (memMode == MemoryMode.OFF_HEAP) {
columnVectors = OffHeapColumnVector.allocateColumns(capacity, batchSchema);
} else {
columnVectors = OnHeapColumnVector.allocateColumns(capacity, batchSchema);
}
columnarBatch = new ColumnarBatch(batchSchema, columnVectors, capacity);
if (partitionColumns != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@ public AggregateHashMap(StructType schema, int capacity, double loadFactor, int

this.maxSteps = maxSteps;
numBuckets = (int) (capacity / loadFactor);
columnVectors = new OnHeapColumnVector[schema.fields().length];
for (int i = 0; i < schema.fields().length; i++) {
columnVectors[i] = new OnHeapColumnVector(capacity, schema.fields()[i].dataType());
}
columnVectors = OnHeapColumnVector.allocateColumns(capacity, schema);
batch = new ColumnarBatch(schema, columnVectors, capacity);
buckets = new int[numBuckets];
Arrays.fill(buckets, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,11 @@ private static void appendValue(MutableColumnVector dst, DataType t, Row src, in
public static ColumnarBatch toBatch(
StructType schema, MemoryMode memMode, Iterator<Row> row) {
int capacity = ColumnarBatch.DEFAULT_BATCH_SIZE;
MutableColumnVector[] columnVectors = new MutableColumnVector[schema.fields().length];
for (int i = 0; i < schema.fields().length; i++) {
if (memMode == MemoryMode.OFF_HEAP) {
columnVectors[i] = new OffHeapColumnVector(capacity, schema.fields()[i].dataType());
} else {
columnVectors[i] = new OnHeapColumnVector(capacity, schema.fields()[i].dataType());
}
MutableColumnVector[] columnVectors;
if (memMode == MemoryMode.OFF_HEAP) {
columnVectors = OffHeapColumnVector.allocateColumns(capacity, schema);
} else {
columnVectors = OnHeapColumnVector.allocateColumns(capacity, schema);
}

int n = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,28 @@ public final class OffHeapColumnVector extends MutableColumnVector {
private static final boolean bigEndianPlatform =
ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);

/**
* Allocates columns to store elements of each field of the schema off heap.
* Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
* in number of elements, not number of bytes.
*/
public static OffHeapColumnVector[] allocateColumns(int capacity, StructType schema) {
return allocateColumns(capacity, schema.fields());
}

/**
* Allocates columns to store elements of each field off heap.
* Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
* in number of elements, not number of bytes.
*/
public static OffHeapColumnVector[] allocateColumns(int capacity, StructField[] fields) {
OffHeapColumnVector[] vectors = new OffHeapColumnVector[fields.length];
for (int i = 0; i < fields.length; i++) {
vectors[i] = new OffHeapColumnVector(capacity, fields[i].dataType());
}
return vectors;
}

// The data stored in these two allocations need to maintain binary compatible. We can
// directly pass this buffer to external components.
private long nulls;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,28 @@ public final class OnHeapColumnVector extends MutableColumnVector {
private static final boolean bigEndianPlatform =
ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);

/**
* Allocates columns to store elements of each field of the schema on heap.
* Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
* in number of elements, not number of bytes.
*/
public static OnHeapColumnVector[] allocateColumns(int capacity, StructType schema) {
return allocateColumns(capacity, schema.fields());
}

/**
* Allocates columns to store elements of each field on heap.
* Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
* in number of elements, not number of bytes.
*/
public static OnHeapColumnVector[] allocateColumns(int capacity, StructField[] fields) {
OnHeapColumnVector[] vectors = new OnHeapColumnVector[fields.length];
for (int i = 0; i < fields.length; i++) {
vectors[i] = new OnHeapColumnVector(capacity, fields[i].dataType());
}
return vectors;
}

// The data stored in these arrays need to maintain binary compatible. We can
// directly pass this buffer to external components.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,8 @@ class VectorizedHashMapGenerator(
| $generatedAggBufferSchema
|
| public $generatedClassName() {
| batchVectors = new org.apache.spark.sql.execution.vectorized
| .OnHeapColumnVector[schema.fields().length];
| for (int i = 0; i < schema.fields().length; i++) {
| batchVectors[i] = new org.apache.spark.sql.execution.vectorized.OnHeapColumnVector(
| capacity, schema.fields()[i].dataType());
| }
| batchVectors = org.apache.spark.sql.execution.vectorized
| .OnHeapColumnVector.allocateColumns(capacity, schema);
| batch = new org.apache.spark.sql.execution.vectorized.ColumnarBatch(
| schema, batchVectors, capacity);
|
Expand Down

0 comments on commit b6ab633

Please sign in to comment.