Skip to content

Commit

Permalink
Rename MutableColumnVector to WritableColumnVector.
Browse files Browse the repository at this point in the history
  • Loading branch information
ueshin committed Aug 21, 2017
1 parent d7b77f7 commit 4d94655
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.sql.execution.vectorized.MutableColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;

Expand Down Expand Up @@ -136,9 +136,9 @@ private boolean next() throws IOException {
/**
* Reads `total` values from this columnReader into column.
*/
void readBatch(int total, MutableColumnVector column) throws IOException {
void readBatch(int total, WritableColumnVector column) throws IOException {
int rowId = 0;
MutableColumnVector dictionaryIds = null;
WritableColumnVector dictionaryIds = null;
if (dictionary != null) {
// SPARK-16334: We only maintain a single dictionary per row batch, so that it can be used to
// decode all previous dictionary encoded pages if we ever encounter a non-dictionary encoded
Expand Down Expand Up @@ -223,7 +223,7 @@ void readBatch(int total, MutableColumnVector column) throws IOException {
private void decodeDictionaryIds(
int rowId,
int num,
MutableColumnVector column,
WritableColumnVector column,
ColumnVector dictionaryIds) {
switch (descriptor.getType()) {
case INT32:
Expand Down Expand Up @@ -350,13 +350,13 @@ private void decodeDictionaryIds(
* is guaranteed that num is smaller than the number of values left in the current page.
*/

private void readBooleanBatch(int rowId, int num, MutableColumnVector column) throws IOException {
private void readBooleanBatch(int rowId, int num, WritableColumnVector column) throws IOException {
assert(column.dataType() == DataTypes.BooleanType);
defColumn.readBooleans(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
}

private void readIntBatch(int rowId, int num, MutableColumnVector column) throws IOException {
private void readIntBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType ||
Expand All @@ -374,7 +374,7 @@ private void readIntBatch(int rowId, int num, MutableColumnVector column) throws
}
}

private void readLongBatch(int rowId, int num, MutableColumnVector column) throws IOException {
private void readLongBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType())) {
Expand All @@ -393,7 +393,7 @@ private void readLongBatch(int rowId, int num, MutableColumnVector column) throw
}
}

private void readFloatBatch(int rowId, int num, MutableColumnVector column) throws IOException {
private void readFloatBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: support implicit cast to double?
if (column.dataType() == DataTypes.FloatType) {
Expand All @@ -404,7 +404,7 @@ private void readFloatBatch(int rowId, int num, MutableColumnVector column) thro
}
}

private void readDoubleBatch(int rowId, int num, MutableColumnVector column) throws IOException {
private void readDoubleBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
if (column.dataType() == DataTypes.DoubleType) {
Expand All @@ -415,7 +415,7 @@ private void readDoubleBatch(int rowId, int num, MutableColumnVector column) thr
}
}

private void readBinaryBatch(int rowId, int num, MutableColumnVector column) throws IOException {
private void readBinaryBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
Expand All @@ -439,7 +439,7 @@ private void readBinaryBatch(int rowId, int num, MutableColumnVector column) thr
private void readFixedLenByteArrayBatch(
int rowId,
int num,
MutableColumnVector column,
WritableColumnVector column,
int arrayLen) throws IOException {
VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
// This is where we implement support for the valid type conversions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
import org.apache.spark.sql.execution.vectorized.MutableColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.types.StructField;
Expand Down Expand Up @@ -93,7 +93,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
*/
private ColumnarBatch columnarBatch;

private MutableColumnVector[] columnVectors;
private WritableColumnVector[] columnVectors;

/**
* If true, this class returns batches instead of rows.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

import org.apache.spark.sql.execution.vectorized.MutableColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.unsafe.Platform;

import org.apache.parquet.column.values.ValuesReader;
Expand Down Expand Up @@ -56,39 +56,39 @@ public void skip() {
}

@Override
public final void readBooleans(int total, MutableColumnVector c, int rowId) {
public final void readBooleans(int total, WritableColumnVector c, int rowId) {
// TODO: properly vectorize this
for (int i = 0; i < total; i++) {
c.putBoolean(rowId + i, readBoolean());
}
}

@Override
public final void readIntegers(int total, MutableColumnVector c, int rowId) {
public final void readIntegers(int total, WritableColumnVector c, int rowId) {
c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
offset += 4 * total;
}

@Override
public final void readLongs(int total, MutableColumnVector c, int rowId) {
public final void readLongs(int total, WritableColumnVector c, int rowId) {
c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
offset += 8 * total;
}

@Override
public final void readFloats(int total, MutableColumnVector c, int rowId) {
public final void readFloats(int total, WritableColumnVector c, int rowId) {
c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
offset += 4 * total;
}

@Override
public final void readDoubles(int total, MutableColumnVector c, int rowId) {
public final void readDoubles(int total, WritableColumnVector c, int rowId) {
c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
offset += 8 * total;
}

@Override
public final void readBytes(int total, MutableColumnVector c, int rowId) {
public final void readBytes(int total, WritableColumnVector c, int rowId) {
for (int i = 0; i < total; i++) {
// Bytes are stored as a 4-byte little endian int. Just read the first byte.
// TODO: consider pushing this in ColumnVector by adding a readBytes with a stride.
Expand Down Expand Up @@ -159,7 +159,7 @@ public final double readDouble() {
}

@Override
public final void readBinary(int total, MutableColumnVector v, int rowId) {
public final void readBinary(int total, WritableColumnVector v, int rowId) {
for (int i = 0; i < total; i++) {
int len = readInteger();
int start = offset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.Binary;

import org.apache.spark.sql.execution.vectorized.MutableColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;

/**
* A values reader for Parquet's run-length encoded data. This is based off of the version in
Expand Down Expand Up @@ -179,7 +179,7 @@ public int readInteger() {
*/
public void readIntegers(
int total,
MutableColumnVector c,
WritableColumnVector c,
int rowId,
int level,
VectorizedValuesReader data) {
Expand Down Expand Up @@ -214,7 +214,7 @@ public void readIntegers(
// TODO: can this code duplication be removed without a perf penalty?
public void readBooleans(
int total,
MutableColumnVector c,
WritableColumnVector c,
int rowId,
int level,
VectorizedValuesReader data) {
Expand Down Expand Up @@ -248,7 +248,7 @@ public void readBooleans(

public void readBytes(
int total,
MutableColumnVector c,
WritableColumnVector c,
int rowId,
int level,
VectorizedValuesReader data) {
Expand Down Expand Up @@ -282,7 +282,7 @@ public void readBytes(

public void readShorts(
int total,
MutableColumnVector c,
WritableColumnVector c,
int rowId,
int level,
VectorizedValuesReader data) {
Expand Down Expand Up @@ -318,7 +318,7 @@ public void readShorts(

public void readLongs(
int total,
MutableColumnVector c,
WritableColumnVector c,
int rowId,
int level,
VectorizedValuesReader data) {
Expand Down Expand Up @@ -352,7 +352,7 @@ public void readLongs(

public void readFloats(
int total,
MutableColumnVector c,
WritableColumnVector c,
int rowId,
int level,
VectorizedValuesReader data) {
Expand Down Expand Up @@ -386,7 +386,7 @@ public void readFloats(

public void readDoubles(
int total,
MutableColumnVector c,
WritableColumnVector c,
int rowId,
int level,
VectorizedValuesReader data) {
Expand Down Expand Up @@ -420,7 +420,7 @@ public void readDoubles(

public void readBinarys(
int total,
MutableColumnVector c,
WritableColumnVector c,
int rowId,
int level,
VectorizedValuesReader data) {
Expand Down Expand Up @@ -458,8 +458,8 @@ public void readBinarys(
*/
public void readIntegers(
int total,
MutableColumnVector values,
MutableColumnVector nulls,
WritableColumnVector values,
WritableColumnVector nulls,
int rowId,
int level,
VectorizedValuesReader data) {
Expand Down Expand Up @@ -496,7 +496,7 @@ public void readIntegers(
// IDs. This is different than the above APIs that decodes definitions levels along with values.
// Since this is only used to decode dictionary IDs, only decoding integers is supported.
@Override
public void readIntegers(int total, MutableColumnVector c, int rowId) {
public void readIntegers(int total, WritableColumnVector c, int rowId) {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
Expand All @@ -522,32 +522,32 @@ public byte readByte() {
}

@Override
public void readBytes(int total, MutableColumnVector c, int rowId) {
public void readBytes(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException("only readInts is valid.");
}

@Override
public void readLongs(int total, MutableColumnVector c, int rowId) {
public void readLongs(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException("only readInts is valid.");
}

@Override
public void readBinary(int total, MutableColumnVector c, int rowId) {
public void readBinary(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException("only readInts is valid.");
}

@Override
public void readBooleans(int total, MutableColumnVector c, int rowId) {
public void readBooleans(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException("only readInts is valid.");
}

@Override
public void readFloats(int total, MutableColumnVector c, int rowId) {
public void readFloats(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException("only readInts is valid.");
}

@Override
public void readDoubles(int total, MutableColumnVector c, int rowId) {
public void readDoubles(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException("only readInts is valid.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.parquet;

import org.apache.spark.sql.execution.vectorized.MutableColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;

import org.apache.parquet.io.api.Binary;

Expand All @@ -37,11 +37,11 @@ public interface VectorizedValuesReader {
/*
* Reads `total` values into `c` start at `c[rowId]`
*/
void readBooleans(int total, MutableColumnVector c, int rowId);
void readBytes(int total, MutableColumnVector c, int rowId);
void readIntegers(int total, MutableColumnVector c, int rowId);
void readLongs(int total, MutableColumnVector c, int rowId);
void readFloats(int total, MutableColumnVector c, int rowId);
void readDoubles(int total, MutableColumnVector c, int rowId);
void readBinary(int total, MutableColumnVector c, int rowId);
void readBooleans(int total, WritableColumnVector c, int rowId);
void readBytes(int total, WritableColumnVector c, int rowId);
void readIntegers(int total, WritableColumnVector c, int rowId);
void readLongs(int total, WritableColumnVector c, int rowId);
void readFloats(int total, WritableColumnVector c, int rowId);
void readDoubles(int total, WritableColumnVector c, int rowId);
void readBinary(int total, WritableColumnVector c, int rowId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class ColumnVectorUtils {
/**
* Populates the entire `col` with `row[fieldIdx]`
*/
public static void populate(MutableColumnVector col, InternalRow row, int fieldIdx) {
public static void populate(WritableColumnVector col, InternalRow row, int fieldIdx) {
int capacity = col.capacity;
DataType t = col.dataType();

Expand Down Expand Up @@ -115,7 +115,7 @@ public static Object toPrimitiveJavaArray(ColumnVector.Array array) {
}
}

private static void appendValue(MutableColumnVector dst, DataType t, Object o) {
private static void appendValue(WritableColumnVector dst, DataType t, Object o) {
if (o == null) {
if (t instanceof CalendarIntervalType) {
dst.appendStruct(true);
Expand Down Expand Up @@ -165,7 +165,7 @@ private static void appendValue(MutableColumnVector dst, DataType t, Object o) {
}
}

private static void appendValue(MutableColumnVector dst, DataType t, Row src, int fieldIdx) {
private static void appendValue(WritableColumnVector dst, DataType t, Row src, int fieldIdx) {
if (t instanceof ArrayType) {
ArrayType at = (ArrayType)t;
if (src.isNullAt(fieldIdx)) {
Expand Down Expand Up @@ -199,7 +199,7 @@ private static void appendValue(MutableColumnVector dst, DataType t, Row src, in
public static ColumnarBatch toBatch(
StructType schema, MemoryMode memMode, Iterator<Row> row) {
int capacity = ColumnarBatch.DEFAULT_BATCH_SIZE;
MutableColumnVector[] columnVectors;
WritableColumnVector[] columnVectors;
if (memMode == MemoryMode.OFF_HEAP) {
columnVectors = OffHeapColumnVector.allocateColumns(capacity, schema);
} else {
Expand Down
Loading

0 comments on commit 4d94655

Please sign in to comment.