Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the RecordReader to pick incoming time column #3895

Merged
merged 3 commits into from
Mar 5, 2019

Conversation

Jackie-Jiang
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang commented Feb 28, 2019

Fix AvroRecordReader, CSVRecordReader, JSONRecordReader, ThriftRecordReader to be able to read incoming time column
Move common util methods into RecordReaderUtils to achieve:

  • If incoming and outgoing time column name are the same, use incoming field spec (data type)
  • If incoming and outgoing time column name are different, try reading both of them
  • If no value provided, do not fill default value for time column (don't allow default time column)

Other fixes:

  • Fix the issue where AvroRecordReader does not convert the value into data type from schema
  • Support reading empty string for all record readers (to be the same as AvroRecordReader behavior)
  • Make ThriftRecordReader behave the same as other record readers

Add tests for all scenarios

@Jackie-Jiang Jackie-Jiang force-pushed the fix_record_reader branch 2 times, most recently from 08f4933 to da22f6f Compare February 28, 2019 23:07
@codecov-io
Copy link

codecov-io commented Feb 28, 2019

Codecov Report

Merging #3895 into master will decrease coverage by 0.02%.
The diff coverage is 82.3%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #3895      +/-   ##
============================================
- Coverage     67.23%   67.21%   -0.03%     
  Complexity        4        4              
============================================
  Files          1032     1032              
  Lines         50897    50884      -13     
  Branches       7109     7108       -1     
============================================
- Hits          34220    34200      -20     
- Misses        14339    14344       +5     
- Partials       2338     2340       +2
Impacted Files Coverage Δ Complexity Δ
...ain/java/org/apache/pinot/core/util/AvroUtils.java 41.48% <ø> (-11.41%) 0 <0> (ø)
...e/pinot/core/data/readers/RecordReaderFactory.java 42.85% <0%> (ø) 0 <0> (ø) ⬇️
...ache/pinot/core/data/readers/JSONRecordReader.java 90% <100%> (-0.91%) 0 <0> (ø)
...time/impl/kafka/AvroRecordToPinotRowGenerator.java 100% <100%> (ø) 0 <0> (ø) ⬇️
...ache/pinot/core/data/readers/AvroRecordReader.java 85.1% <100%> (+7.05%) 0 <0> (ø) ⬇️
...che/pinot/core/data/readers/RecordReaderUtils.java 75.3% <77.94%> (+6.07%) 0 <0> (ø) ⬇️
...he/pinot/core/data/readers/ThriftRecordReader.java 83.67% <83.33%> (+2.13%) 0 <0> (ø) ⬇️
...pache/pinot/core/data/readers/CSVRecordReader.java 71.15% <88.88%> (+2.52%) 0 <0> (ø) ⬇️
...startree/executor/StarTreeAggregationExecutor.java 72.72% <0%> (-27.28%) 0% <0%> (ø)
...a/manager/realtime/RealtimeSegmentDataManager.java 75% <0%> (-25%) 0% <0%> (ø)
... and 32 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ce551eb...1b87860. Read the comment docs.

@snleee
Copy link
Contributor

snleee commented Mar 1, 2019

Out of curiosity, why do we allow different incoming and outgoing time column names? What would be the use case that requires this feature? I think that it's better to enforce those two column names to be the same.

if (isPinotFieldSingleValue != isAvroFieldSingleValue) {
String errorMessage = "Pinot field: " + fieldName + " is " + (isPinotFieldSingleValue ? "Single" : "Multi")
+ "-valued in Pinot schema but not in Avro schema";
if (fieldSpec.getFieldType() == FieldSpec.FieldType.TIME) {
Copy link
Contributor

@snleee snleee Mar 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to avoid this repeating validation logic for each record reader? This makes hard for people to add a new type of record reader. For instance, outside contributor is working on adding a parquet reader #3852

One approach would be always try to read incoming & outgoing time column values and do the validation based on GenericRow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Now most of the common logic are maintained in RecordReaderUtils.

@@ -43,6 +43,7 @@ public static RecordReader getRecordReader(SegmentGeneratorConfig segmentGenerat
return new CSVRecordReader(dataFile, schema, (CSVRecordReaderConfig) segmentGeneratorConfig.getReaderConfig());
case JSON:
return new JSONRecordReader(dataFile, schema);
// TODO: PinotSegmentRecordReader abd ThriftRecordReader do not support default value or different incoming/outgoing time column
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abd -> and

@Jackie-Jiang
Copy link
Contributor Author

@snleee For example, incoming time column name is millisSinceEpoch with unit MILLISECONDS, it is confusing to convert this column to DAYS without changing the name to daysSinceEpoch.

@Jackie-Jiang Jackie-Jiang force-pushed the fix_record_reader branch 2 times, most recently from cc79eef to a61c1ea Compare March 2, 2019 01:16
Fix AvroRecordReader, CSVRecordReader, JSONRecordReader, ThriftRecordReader to be able to read incoming time column
Move common util methods into RecordReaderUtils to achieve:
- If incoming and outgoing time column name are the same, use incoming field spec (data type)
- If incoming and outgoing time column name are different, try reading both of them
- If no value provided, do not fill default value for time column (don't allow default time column)

Other fixes:
- Fix the issue where AvroRecordReader does not convert the value into data type from schema
- Support reading empty string for all record readers (to be the same as AvroRecordReader behavior)
- Make ThriftRecordReader behave the same as other record readers

Add tests for all scenarios
Copy link
Member

@jackjlli jackjlli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we need to handle this for all the concrete classes of RecordReader. Can we leave some comments in API doc of RecordReader, so that when we add more concrete class, we follow the same convention?

@Jackie-Jiang
Copy link
Contributor Author

@jackjlli Added javadoc to RecordReader interface. Thanks for the suggestion.

.addSingleValueDimension("unknown_dimension", FieldSpec.DataType.STRING)
.addMetric("met_impressionCount", FieldSpec.DataType.LONG).addMetric("unknown_metric", FieldSpec.DataType.DOUBLE)
.build();
private final Schema SCHEMA_SAME_INCOMING_OUTGOING = new Schema.SchemaBuilder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure the logic here. SCHEMA_SAME_INCOMING_OUTGOING should have the same TimeUnit? But here one is in Days and the other is in Seconds. Or could you add some comments here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments

private final Schema SCHEMA_DIFFERENT_INCOMING_OUTGOING = new Schema.SchemaBuilder()
.addTime("time_day", TimeUnit.SECONDS, FieldSpec.DataType.LONG, "column2", TimeUnit.DAYS, FieldSpec.DataType.INT)
.build();
private final Schema SCHEMA_NO_INCOMING = new Schema.SchemaBuilder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as here. If the schema has no incoming, why do we need "incoming", TimeUnit.SECONDS, FieldSpec.DataType.LONG?

Copy link
Contributor

@snleee snleee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM otherwise. Thank you for making the change for having a shared check for incoming, outgoing time field spec. This looks much cleaner.

value = RecordReaderUtils.convertToDataTypeArray((ArrayList) jsonValue, fieldSpec);
Object value = record.get(fieldName);
// Allow default value for non-time columns
if (value != null || fieldSpec.getFieldType() != FieldSpec.FieldType.TIME) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of ignoring null time column values, maybe it's better to throw the exception? How do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot throw exception because it is valid if only one of incoming or outgoing column is not in the record. The time converter can catch this and throw exception if no time column is set.

Copy link
Member

@jackjlli jackjlli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for working on this.

@Jackie-Jiang Jackie-Jiang merged commit a7a8bb6 into master Mar 5, 2019
@Jackie-Jiang Jackie-Jiang deleted the fix_record_reader branch March 5, 2019 22:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants