Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Parquet related classes to the new package #4073

Merged
merged 1 commit into from
Apr 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions pinot-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,6 @@
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-server</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>${parquet.version}</version>
</dependency>
</dependencies>
<profiles>
<profile>
Expand Down
36 changes: 15 additions & 21 deletions pinot-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,6 @@
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-common</artifactId>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
Expand Down Expand Up @@ -154,6 +142,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math</artifactId>
</dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
Expand Down Expand Up @@ -199,24 +191,26 @@

<!-- test -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math</artifactId>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,12 @@
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.util.AvroUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Record reader for AVRO file.
*/
public class AvroRecordReader implements RecordReader {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroRecordReader.class);

private final File _dataFile;
private final Schema _schema;
private final List<FieldSpec> _fieldSpecs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
package org.apache.pinot.core.data.readers;

public enum FileFormat {
AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, PARQUET, OTHER
AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, OTHER
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public interface RecordReader extends Closeable {
/**
* Initializes the record reader when needed
*/
void init(SegmentGeneratorConfig segmentGeneratorConfig);
void init(SegmentGeneratorConfig segmentGeneratorConfig)
throws IOException;

/**
* Return <code>true</code> if more records remain to be read.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ public static RecordReader getRecordReader(SegmentGeneratorConfig segmentGenerat
// If this is set, this will override the file format
if (recordReaderPath != null) {
if (fileFormat != FileFormat.OTHER) {
// We currently have default file format set to AVRO inside segment generator config,
// do not want to break this behavior for clients.
LOGGER.warn("Using recordReaderPath {} to read segment, ignoring fileformat {}",
recordReaderPath, fileFormat.toString());
// NOTE: we currently have default file format set to AVRO inside segment generator config, do not want to break
// this behavior for clients.
LOGGER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous format looks better to me :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but just follow the auto-format...

.warn("Using class: {} to read segment, ignoring configured file format: {}", recordReaderPath, fileFormat);
}
RecordReader recordReader = (RecordReader) Class.forName(recordReaderPath).newInstance();
recordReader.init(segmentGeneratorConfig);
Expand All @@ -70,8 +70,6 @@ public static RecordReader getRecordReader(SegmentGeneratorConfig segmentGenerat
// NOTE: PinotSegmentRecordReader does not support time conversion (field spec must match)
case PINOT:
return new PinotSegmentRecordReader(dataFile, schema, segmentGeneratorConfig.getColumnSortOrder());
case PARQUET:
return new ParquetRecordReader(dataFile, schema);
default:
throw new UnsupportedOperationException("Unsupported input file format: " + fileFormat);
}
Expand Down
70 changes: 31 additions & 39 deletions pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,15 +31,13 @@
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.pinot.common.data.DimensionFieldSpec;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.MetricFieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.data.TimeFieldSpec;
import org.apache.pinot.core.data.GenericRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -58,7 +55,6 @@ private AvroUtils() {
* @param fieldTypeMap Map from column to field type
* @param timeUnit Time unit
* @return Pinot schema
* @throws IOException
*/
public static Schema getPinotSchemaFromAvroSchema(@Nonnull org.apache.avro.Schema avroSchema,
@Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit) {
Expand Down Expand Up @@ -103,7 +99,6 @@ public static Schema getPinotSchemaFromAvroSchema(@Nonnull org.apache.avro.Schem
* @param fieldTypeMap Map from column to field type
* @param timeUnit Time unit
* @return Pinot schema
* @throws IOException
*/
public static Schema getPinotSchemaFromAvroDataFile(@Nonnull File avroDataFile,
@Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit)
Expand All @@ -120,7 +115,6 @@ public static Schema getPinotSchemaFromAvroDataFile(@Nonnull File avroDataFile,
*
* @param avroDataFile Avro data file
* @return Pinot schema
* @throws IOException
*/
public static Schema getPinotSchemaFromAvroDataFile(@Nonnull File avroDataFile)
throws IOException {
Expand All @@ -134,7 +128,6 @@ public static Schema getPinotSchemaFromAvroDataFile(@Nonnull File avroDataFile)
* @param fieldTypeMap Map from column to field type
* @param timeUnit Time unit
* @return Pinot schema
* @throws IOException
*/
public static Schema getPinotSchemaFromAvroSchemaFile(@Nonnull File avroSchemaFile,
@Nullable Map<String, FieldSpec.FieldType> fieldTypeMap, @Nullable TimeUnit timeUnit)
Expand Down Expand Up @@ -203,16 +196,44 @@ public static org.apache.avro.Schema getAvroSchemaFromPinotSchema(Schema pinotSc
return fieldAssembler.endRecord();
}

/**
* Validates Pinot schema against the given Avro schema.
*/
public static void validateSchema(Schema schema, org.apache.avro.Schema avroSchema) {
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
String fieldName = fieldSpec.getName();
Field avroField = avroSchema.getField(fieldName);
if (avroField == null) {
LOGGER.warn("Pinot field: {} does not exist in Avro Schema", fieldName);
} else {
boolean isPinotFieldSingleValue = fieldSpec.isSingleValueField();
boolean isAvroFieldSingleValue = AvroUtils.isSingleValueField(avroField);
if (isPinotFieldSingleValue != isAvroFieldSingleValue) {
String errorMessage = "Pinot field: " + fieldName + " is " + (isPinotFieldSingleValue ? "Single" : "Multi")
+ "-valued in Pinot schema but not in Avro schema";
LOGGER.error(errorMessage);
throw new IllegalStateException(errorMessage);
}

FieldSpec.DataType pinotFieldDataType = fieldSpec.getDataType();
FieldSpec.DataType avroFieldDataType = AvroUtils.extractFieldDataType(avroField);
if (pinotFieldDataType != avroFieldDataType) {
LOGGER.warn("Pinot field: {} of type: {} mismatches with corresponding field in Avro Schema of type: {}",
fieldName, pinotFieldDataType, avroFieldDataType);
}
}
}
}

/**
* Get the Avro file reader for the given file.
*/
public static DataFileStream<GenericRecord> getAvroReader(File avroFile)
throws IOException {
if (avroFile.getName().endsWith(".gz")) {
return new DataFileStream<>(new GZIPInputStream(new FileInputStream(avroFile)),
new GenericDatumReader<GenericRecord>());
return new DataFileStream<>(new GZIPInputStream(new FileInputStream(avroFile)), new GenericDatumReader<>());
} else {
return new DataFileStream<>(new FileInputStream(avroFile), new GenericDatumReader<GenericRecord>());
return new DataFileStream<>(new FileInputStream(avroFile), new GenericDatumReader<>());
}
}

Expand Down Expand Up @@ -275,33 +296,4 @@ private static org.apache.avro.Schema extractSupportedSchema(org.apache.avro.Sch
return fieldSchema;
}
}

/**
* Valid table schema with avro schema
*/
public static void validateSchema(Schema schema, org.apache.avro.Schema avroSchema) {
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
String fieldName = fieldSpec.getName();
Field avroField = avroSchema.getField(fieldName);
if (avroField == null) {
LOGGER.warn("Pinot field: {} does not exist in Avro Schema", fieldName);
} else {
boolean isPinotFieldSingleValue = fieldSpec.isSingleValueField();
boolean isAvroFieldSingleValue = AvroUtils.isSingleValueField(avroField);
if (isPinotFieldSingleValue != isAvroFieldSingleValue) {
String errorMessage = "Pinot field: " + fieldName + " is " + (isPinotFieldSingleValue ? "Single" : "Multi")
+ "-valued in Pinot schema but not in Avro schema";
LOGGER.error(errorMessage);
throw new IllegalStateException(errorMessage);
}

FieldSpec.DataType pinotFieldDataType = fieldSpec.getDataType();
FieldSpec.DataType avroFieldDataType = AvroUtils.extractFieldDataType(avroField);
if (pinotFieldDataType != avroFieldDataType) {
LOGGER.warn("Pinot field: {} of type: {} mismatches with corresponding field in Avro Schema of type: {}",
fieldName, pinotFieldDataType, avroFieldDataType);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ public SegmentCreationJob(Properties properties) {

@Override
protected boolean isDataFile(String fileName) {
// Other files may have different extensions, eg: orc can have no extension
// For custom record reader, treat all files as data file
if (_properties.getProperty(JobConfigConstants.RECORD_READER_PATH) != null) {
return true;
}
return fileName.endsWith(".avro") || fileName.endsWith(".csv") || fileName.endsWith(".json") || fileName
.endsWith(".thrift") || fileName.endsWith(".parquet");
.endsWith(".thrift");
}

public void run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ public void setup(Context context)
if (readerConfigFile != null) {
_readerConfigFile = new Path(readerConfigFile);
}

_recordReaderPath = _jobConf.get(JobConfigConstants.RECORD_READER_PATH, null);
_recordReaderPath = _jobConf.get(JobConfigConstants.RECORD_READER_PATH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RECORD_READER_CLASS_NAME seems to be more correct name... But, I guess that we are already using this config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, so cannot change it lol


// Set up segment name generator
String segmentNameGeneratorType =
Expand Down Expand Up @@ -207,8 +206,8 @@ protected void map(LongWritable key, Text value, Context context)
segmentGeneratorConfig.setOutDir(_localSegmentDir.getPath());
segmentGeneratorConfig.setSegmentNameGenerator(_segmentNameGenerator);
segmentGeneratorConfig.setSequenceId(sequenceId);
segmentGeneratorConfig.setRecordReaderPath(_recordReaderPath);
if (_recordReaderPath != null) {
segmentGeneratorConfig.setRecordReaderPath(_recordReaderPath);
segmentGeneratorConfig.setFormat(FileFormat.OTHER);
} else {
FileFormat fileFormat = getFileFormat(inputFileName);
Expand Down Expand Up @@ -276,9 +275,6 @@ protected FileFormat getFileFormat(String fileName) {
if (fileName.endsWith(".thrift")) {
return FileFormat.THRIFT;
}
if (fileName.endsWith(".parquet")) {
return FileFormat.PARQUET;
}
throw new IllegalArgumentException("Unsupported file format: {}" + fileName);
}

Expand Down
2 changes: 1 addition & 1 deletion pinot-orc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,4 @@
<scope>test</scope>
</dependency>
</dependencies>
</project>
</project>
Loading