Skip to content

Commit

Permalink
Revert "add support for parquet reader (#3852)"
Browse files Browse the repository at this point in the history
This reverts commit 69db491.
  • Loading branch information
Seunghyun Lee authored Apr 3, 2019
1 parent fdd9b92 commit bb57c10
Show file tree
Hide file tree
Showing 12 changed files with 32 additions and 368 deletions.
5 changes: 0 additions & 5 deletions pinot-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,6 @@
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-server</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>${parquet.version}</version>
</dependency>
</dependencies>
<profiles>
<profile>
Expand Down

This file was deleted.

5 changes: 0 additions & 5 deletions pinot-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,6 @@
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.avro.Schema.Field;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericRecord;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.FieldSpec.DataType;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
Expand Down Expand Up @@ -52,7 +54,7 @@ public AvroRecordReader(File dataFile, Schema schema)
_fieldSpecs = RecordReaderUtils.extractFieldSpecs(schema);
_avroReader = AvroUtils.getAvroReader(dataFile);
try {
AvroUtils.validateSchema(_schema, _avroReader.getSchema());
validateSchema();
} catch (Exception e) {
_avroReader.close();
throw e;
Expand All @@ -64,6 +66,33 @@ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {

}

private void validateSchema() {
org.apache.avro.Schema avroSchema = _avroReader.getSchema();
for (FieldSpec fieldSpec : _fieldSpecs) {
String fieldName = fieldSpec.getName();
Field avroField = avroSchema.getField(fieldName);
if (avroField == null) {
LOGGER.warn("Pinot field: {} does not exist in Avro Schema", fieldName);
} else {
boolean isPinotFieldSingleValue = fieldSpec.isSingleValueField();
boolean isAvroFieldSingleValue = AvroUtils.isSingleValueField(avroField);
if (isPinotFieldSingleValue != isAvroFieldSingleValue) {
String errorMessage = "Pinot field: " + fieldName + " is " + (isPinotFieldSingleValue ? "Single" : "Multi")
+ "-valued in Pinot schema but not in Avro schema";
LOGGER.error(errorMessage);
throw new IllegalStateException(errorMessage);
}

DataType pinotFieldDataType = fieldSpec.getDataType();
DataType avroFieldDataType = AvroUtils.extractFieldDataType(avroField);
if (pinotFieldDataType != avroFieldDataType) {
LOGGER.warn("Pinot field: {} of type: {} mismatches with corresponding field in Avro Schema of type: {}",
fieldName, pinotFieldDataType, avroFieldDataType);
}
}
}
}

@Override
public boolean hasNext() {
return _avroReader.hasNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
package org.apache.pinot.core.data.readers;

public enum FileFormat {
AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, PARQUET, OTHER
AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, OTHER
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ public static RecordReader getRecordReader(SegmentGeneratorConfig segmentGenerat
// NOTE: PinotSegmentRecordReader does not support time conversion (field spec must match)
case PINOT:
return new PinotSegmentRecordReader(dataFile, schema, segmentGeneratorConfig.getColumnSortOrder());
case PARQUET:
return new ParquetRecordReader(dataFile, schema);
default:
throw new UnsupportedOperationException("Unsupported input file format: " + fileFormat);
}
Expand Down
36 changes: 0 additions & 36 deletions pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,22 +31,16 @@
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.pinot.common.data.DimensionFieldSpec;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.MetricFieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.data.TimeFieldSpec;
import org.apache.pinot.core.data.GenericRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class AvroUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroUtils.class);

private AvroUtils() {
}

Expand Down Expand Up @@ -275,33 +268,4 @@ private static org.apache.avro.Schema extractSupportedSchema(org.apache.avro.Sch
return fieldSchema;
}
}

/**
* Valid table schema with avro schema
*/
public static void validateSchema(Schema schema, org.apache.avro.Schema avroSchema) {
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
String fieldName = fieldSpec.getName();
Field avroField = avroSchema.getField(fieldName);
if (avroField == null) {
LOGGER.warn("Pinot field: {} does not exist in Avro Schema", fieldName);
} else {
boolean isPinotFieldSingleValue = fieldSpec.isSingleValueField();
boolean isAvroFieldSingleValue = AvroUtils.isSingleValueField(avroField);
if (isPinotFieldSingleValue != isAvroFieldSingleValue) {
String errorMessage = "Pinot field: " + fieldName + " is " + (isPinotFieldSingleValue ? "Single" : "Multi")
+ "-valued in Pinot schema but not in Avro schema";
LOGGER.error(errorMessage);
throw new IllegalStateException(errorMessage);
}

FieldSpec.DataType pinotFieldDataType = fieldSpec.getDataType();
FieldSpec.DataType avroFieldDataType = AvroUtils.extractFieldDataType(avroField);
if (pinotFieldDataType != avroFieldDataType) {
LOGGER.warn("Pinot field: {} of type: {} mismatches with corresponding field in Avro Schema of type: {}",
fieldName, pinotFieldDataType, avroFieldDataType);
}
}
}
}
}
Loading

0 comments on commit bb57c10

Please sign in to comment.