Skip to content

Commit

Permalink
adapt parqut
Browse files Browse the repository at this point in the history
  • Loading branch information
hn5092 committed Dec 8, 2018
1 parent 4661ac7 commit 45c3b35
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 23 deletions.
3 changes: 2 additions & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
<version>${parquet.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>1.2.1</hive.version.short>
<derby.version>10.12.1.1</derby.version>
<parquet.version>1.10.0</parquet.version>
<parquet.version>1.10.1-SNAPSHOT</parquet.version>
<orc.version>1.5.2</orc.version>
<orc.classifier>nohive</orc.classifier>
<hive.parquet.version>1.6.0</hive.parquet.version>
Expand Down Expand Up @@ -1801,9 +1801,9 @@
<scope>${parquet.test.deps.scope}</scope>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
<version>${hive.parquet.version}</version>
<version>${parquet.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ private[spark] class Client(
jars.foreach { jar =>
if (!isLocalUri(jar)) {
val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
logInfo("Path " + path)
val pathFs = FileSystem.get(path.toUri(), hadoopConf)
pathFs.globStatus(path).filter(_.isFile()).foreach { entry =>
val uri = entry.getPath().toUri()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,25 @@

package org.apache.spark.sql.execution.datasources.parquet;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.schema.Type;

import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;

/**
* A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
Expand Down Expand Up @@ -127,7 +126,7 @@ public VectorizedParquetRecordReader(TimeZone convertTz, boolean useOffHeap, int
*/
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException, UnsupportedOperationException {
throws IOException, InterruptedException, UnsupportedOperationException {
super.initialize(inputSplit, taskAttemptContext);
initializeInternal();
}
Expand All @@ -138,7 +137,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
*/
@Override
public void initialize(String path, List<String> columns) throws IOException,
UnsupportedOperationException {
UnsupportedOperationException {
super.initialize(path, columns);
initializeInternal();
}
Expand Down Expand Up @@ -183,9 +182,9 @@ public float getProgress() {
// Column 2: partitionValues[0]
// Column 3: partitionValues[1]
private void initBatch(
MemoryMode memMode,
StructType partitionColumns,
InternalRow partitionValues) {
MemoryMode memMode,
StructType partitionColumns,
InternalRow partitionValues) {
StructType batchSchema = new StructType();
for (StructField f: sparkSchema.fields()) {
batchSchema = batchSchema.add(f);
Expand Down Expand Up @@ -253,7 +252,13 @@ public boolean nextBatch() throws IOException {
}
columnarBatch.setNumRows(0);
if (rowsReturned >= totalRowCount) return false;
checkEndOfRowGroup();
if (reader.useColumnIndexFilter()) {
if (checkEndOfRowGroupV2()) {
return false;
}
} else {
checkEndOfRowGroup();
}

int num = (int) Math.min((long) capacity, totalCountLoadedSoFar - rowsReturned);
for (int i = 0; i < columnReaders.length; ++i) {
Expand Down Expand Up @@ -289,7 +294,7 @@ private void initializeInternal() throws IOException, UnsupportedOperationExcept
if (columns.get(i).getMaxDefinitionLevel() == 0) {
// Column is missing in data but the required data is non-nullable. This file is invalid.
throw new IOException("Required column is missing in data file. Col: " +
Arrays.toString(colPath));
Arrays.toString(colPath));
}
missingColumns[i] = true;
}
Expand All @@ -301,16 +306,35 @@ private void checkEndOfRowGroup() throws IOException {
PageReadStore pages = reader.readNextRowGroup();
if (pages == null) {
throw new IOException("expecting more rows but reached last block. Read "
+ rowsReturned + " out of " + totalRowCount);
+ rowsReturned + " out of " + totalRowCount);
}
List<ColumnDescriptor> columns = requestedSchema.getColumns();
List<Type> types = requestedSchema.asGroupType().getFields();
columnReaders = new VectorizedColumnReader[columns.size()];
for (int i = 0; i < columns.size(); ++i) {
if (missingColumns[i]) continue;
columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(),
pages.getPageReader(columns.get(i)), convertTz);
pages.getPageReader(columns.get(i)), convertTz);
}
totalCountLoadedSoFar += pages.getRowCount();
}

private boolean checkEndOfRowGroupV2() throws IOException {
if (rowsReturned != totalCountLoadedSoFar) return false;
PageReadStore pages = reader.readNextFilteredRowGroup();
if (pages == null) {
return true;
}
List<ColumnDescriptor> columns = requestedSchema.getColumns();
List<Type> types = requestedSchema.asGroupType().getFields();
columnReaders = new VectorizedColumnReader[columns.size()];
for (int i = 0; i < columns.size(); ++i) {
if (missingColumns[i]) continue;
columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(),
pages.getPageReader(columns.get(i)), convertTz);
}
totalCountLoadedSoFar += pages.getRowCount();
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class ParquetFileFormat
sparkSession: SparkSession,
options: Map[String, String],
path: Path): Boolean = {
true
false
}

override def buildReaderWithPartitionValues(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ private[parquet] class ParquetFilters(
}

override def keep(value: Binary): Boolean = {
if (value == null) return false
UTF8String.fromBytes(value.getBytes).startsWith(
UTF8String.fromBytes(strToBinary.getBytes))
}
Expand Down
2 changes: 1 addition & 1 deletion sql/hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<dependencies>
<!-- Added for Hive Parquet SerDe -->
<dependency>
<groupId>com.twitter</groupId>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
</dependency>
<dependency>
Expand Down

0 comments on commit 45c3b35

Please sign in to comment.