Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve error handling for partial aggregation pushdown #22011

Merged

Conversation

rschlussel
Copy link
Contributor

@rschlussel rschlussel commented Feb 26, 2024

Improve error handling for partial aggregation pushdown and prevent returning wrong results when footer stats should not be relied on. This covers the following cases:

  1. Aggregations have been pushed down but partition file format does not support aggregation pushdown (can occur if table is declared with a supported storage format, but partition has a different storage format). Previously, page source providers for some file formats had special handling for this case, but not all
  2. Always throw an exception if aggregations have been pushed down but partition footer stats are unreliable. Previously, if filter pushdown was enabled (used OrcSelectivePageSourceFactory), we wouldn't create an AggregatedPageSource, so you would get an error somewhere on read. If it was disabled (OrcBatchPageSourceFactory), we would create an AggregatedPageSource and the query would silently give wrong results.
  3. Unexpected state where some but not all columns are of AGGREGATED type.

Error handling is still going to be reader dependent if both the table and partition format support partial aggregation pushdown, but the partition format does not support as many types (e.g. currently supports more types for partial aggregation pushdown).

Description

Previously AggregatedPageSources (which support the execution side of partial aggregation pushdown) were created from within the selective and batch page source factories of supported file formats. Similarly error handling for any unsupported file format needed to be repeated for each PageSourceFactory of all unsupported file formats. This resulted in a fragmented implementation and some unsupported file formats that did not include proper error handling.

Additionally, partial aggregation pushdown cannot be used when footer stats are unreliable, however handling for this was only added for one of the supported file formats factories (OrcSelectivePageSourceFactory) while others (orc and parquet batch factories) could silently return wrong results. Furthermore, the handling in OrcSelectivePageSourceFactory prevented wrong results by not creating an aggregated page source but didn't produce a clear error message because it kept going by trying to create a selective page source.

This PR makes HiveAggregatedPageSourceFactories into a top-level concept similar to HiveSelectivePageSourceFactories and HiveBatchPageSourceFactories so that we can unify all the error handling and prevent bugs from creeping in as new file format page source factories are added.
The main logic of the change is in HivePageSourceProvider. A lot of the rest of it is scaffolding to support that.

Motivation and Context:

  1. to ensure consistent error handling across different page sources even as new page formats or selective readers implementations are added.
  2. To prevent wrong results when footer stats are unreliable regardless of file format and any other configs.

This gap was discovered as part of an audit to make sure we were not assuming that partition file formats will always match table file formats.

Impact

Fix a potential wrong results bug when footer stats are marked as unreliable and aggregation pushdown is enabled. Ensure all file formats that don't support aggregation pushdown will return a clear error to the user.

Test Plan

new unit tests for HivePageSourceProvider

Contributor checklist

  • Please make sure your submission complies with our development, formatting, commit message, and attribution guidelines.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==
Hive Changes
* Fix a potential wrong results bug when footer stats are marked unreliable and partial aggregation pushdown is enabled.  Such queries will now fail with an error.

@rschlussel rschlussel requested a review from a team as a code owner February 26, 2024 21:17
@rschlussel
Copy link
Contributor Author

I'm still updating the tests. Don't review yet

@rschlussel rschlussel force-pushed the aggregation-pushdown-error-handling branch 3 times, most recently from 4ba2d25 to 01bfe06 Compare February 27, 2024 15:14
@rschlussel
Copy link
Contributor Author

this is ready for review (failing tests are flaky/unrelated)

abhiseksaikia
abhiseksaikia previously approved these changes Feb 29, 2024
Copy link
Contributor

@abhiseksaikia abhiseksaikia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % minor nit and a question

Comment on lines 179 to 196
DataSize maxMergeDistance = getOrcMaxMergeDistance(session);
DataSize maxBufferSize = getOrcMaxBufferSize(session);
DataSize streamBufferSize = getOrcStreamBufferSize(session);
DataSize tinyStripeThreshold = getOrcTinyStripeThreshold(session);
DataSize maxReadBlockSize = getOrcMaxReadBlockSize(session);
OrcReaderOptions orcReaderOptions = OrcReaderOptions.builder()
.withMaxMergeDistance(maxMergeDistance)
.withTinyStripeThreshold(tinyStripeThreshold)
.withMaxBlockSize(maxReadBlockSize)
.withZstdJniDecompressionEnabled(isOrcZstdJniDecompressionEnabled(session))
.withAppendRowNumber(appendRowNumberEnabled)
.build();
boolean lazyReadSmallRanges = getOrcLazyReadSmallRanges(session);

OrcDataSource orcDataSource;
Path path = new Path(fileSplit.getPath());
try {
FSDataInputStream inputStream = hdfsEnvironment.getFileSystem(session.getUser(), path, configuration).openFile(path, hiveFileContext);
orcDataSource = new HdfsOrcDataSource(
new OrcDataSourceId(fileSplit.getPath()),
fileSplit.getFileSize(),
maxMergeDistance,
maxBufferSize,
streamBufferSize,
lazyReadSmallRanges,
inputStream,
stats);
}
catch (Exception e) {
if (nullToEmpty(e.getMessage()).trim().equals("Filesystem closed") ||
e instanceof FileNotFoundException) {
throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, e);
}
throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, splitError(e, path, fileSplit.getStart(), fileSplit.getLength()), e);
}

OrcAggregatedMemoryContext systemMemoryUsage = new HiveOrcAggregatedMemoryContext();
try {
DwrfKeyProvider dwrfKeyProvider = new ProjectionBasedDwrfKeyProvider(encryptionInformation, columns, useOrcColumnNames, path);
OrcReader reader = new OrcReader(
orcDataSource,
orcEncoding,
orcFileTailSource,
stripeMetadataSourceFactory,
systemMemoryUsage,
orcReaderOptions,
hiveFileContext.isCacheable(),
dwrfEncryptionProvider,
dwrfKeyProvider,
hiveFileContext.getStats());

List<HiveColumnHandle> physicalColumns = getPhysicalHiveColumnHandles(columns, useOrcColumnNames, reader.getTypes(), path);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: I noticed that some parts of the aggregated page source factory have similar logic as that of its respective non-aggregated page source factory. Does it make sense to refactor this duplicated code or is it better to leave it as is and avoid introducing more complexity/refactoring?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

@ClarenceThreepwood ClarenceThreepwood left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for improving on the original implementation. Overall lgtm

@@ -614,7 +653,8 @@ private static boolean shouldSkipBucket(HiveTableLayoutHandle hiveLayout, HiveSp
return hiveBucketFilter.map(filter -> !filter.getBucketsToKeep().contains(hiveSplit.getReadBucketNumber().getAsInt())).orElse(false);
}

private static boolean shouldSkipPartition(TypeManager typeManager, HiveTableLayoutHandle hiveLayout, DateTimeZone hiveStorageTimeZone, HiveSplit hiveSplit, SplitContext splitContext)
private static boolean shouldSkipPartition(TypeManager typeManager, HiveTableLayoutHandle hiveLayout, DateTimeZone hiveStorageTimeZone, HiveSplit hiveSplit, SplitContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this function signature and the one below

Suggested change
private static boolean shouldSkipPartition(TypeManager typeManager, HiveTableLayoutHandle hiveLayout, DateTimeZone hiveStorageTimeZone, HiveSplit hiveSplit, SplitContext
private static boolean shouldSkipPartition(TypeManager typeManager,
HiveTableLayoutHandle hiveLayout,
DateTimeZone hiveStorageTimeZone,
HiveSplit hiveSplit,
SplitContext splitContext)

Comment on lines 179 to 196
DataSize maxMergeDistance = getOrcMaxMergeDistance(session);
DataSize maxBufferSize = getOrcMaxBufferSize(session);
DataSize streamBufferSize = getOrcStreamBufferSize(session);
DataSize tinyStripeThreshold = getOrcTinyStripeThreshold(session);
DataSize maxReadBlockSize = getOrcMaxReadBlockSize(session);
OrcReaderOptions orcReaderOptions = OrcReaderOptions.builder()
.withMaxMergeDistance(maxMergeDistance)
.withTinyStripeThreshold(tinyStripeThreshold)
.withMaxBlockSize(maxReadBlockSize)
.withZstdJniDecompressionEnabled(isOrcZstdJniDecompressionEnabled(session))
.withAppendRowNumber(appendRowNumberEnabled)
.build();
boolean lazyReadSmallRanges = getOrcLazyReadSmallRanges(session);

OrcDataSource orcDataSource;
Path path = new Path(fileSplit.getPath());
try {
FSDataInputStream inputStream = hdfsEnvironment.getFileSystem(session.getUser(), path, configuration).openFile(path, hiveFileContext);
orcDataSource = new HdfsOrcDataSource(
new OrcDataSourceId(fileSplit.getPath()),
fileSplit.getFileSize(),
maxMergeDistance,
maxBufferSize,
streamBufferSize,
lazyReadSmallRanges,
inputStream,
stats);
}
catch (Exception e) {
if (nullToEmpty(e.getMessage()).trim().equals("Filesystem closed") ||
e instanceof FileNotFoundException) {
throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, e);
}
throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, splitError(e, path, fileSplit.getStart(), fileSplit.getLength()), e);
}

OrcAggregatedMemoryContext systemMemoryUsage = new HiveOrcAggregatedMemoryContext();
try {
DwrfKeyProvider dwrfKeyProvider = new ProjectionBasedDwrfKeyProvider(encryptionInformation, columns, useOrcColumnNames, path);
OrcReader reader = new OrcReader(
orcDataSource,
orcEncoding,
orcFileTailSource,
stripeMetadataSourceFactory,
systemMemoryUsage,
orcReaderOptions,
hiveFileContext.isCacheable(),
dwrfEncryptionProvider,
dwrfKeyProvider,
hiveFileContext.getStats());

List<HiveColumnHandle> physicalColumns = getPhysicalHiveColumnHandles(columns, useOrcColumnNames, reader.getTypes(), path);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@rschlussel rschlussel force-pushed the aggregation-pushdown-error-handling branch from 01bfe06 to a8c2a38 Compare March 4, 2024 17:52
@rschlussel
Copy link
Contributor Author

thanks for review @abhiseksaikia and @ClarenceThreepwood. I've addressed your comments. I also split out the commits a bit as per request from @ajaygeorge.

Copy link
Contributor

@ajaygeorge ajaygeorge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consolidate error handling for ParquetPageSourceFactory a8c2a38 looks good % a nit

@@ -484,7 +454,7 @@ public static boolean checkSchemaMatch(org.apache.parquet.schema.Type parquetTyp
return prestoType.equals(BIGINT) || prestoType.equals(DECIMAL) || prestoType.equals(TIMESTAMP) || prestoType.equals(StandardTypes.REAL) || prestoType.equals(StandardTypes.DOUBLE);
case INT32:
return prestoType.equals(INTEGER) || prestoType.equals(BIGINT) || prestoType.equals(SMALLINT) || prestoType.equals(DATE) || prestoType.equals(DECIMAL) ||
prestoType.equals(TINYINT) || prestoType.equals(REAL) || prestoType.equals(StandardTypes.DOUBLE);
prestoType.equals(TINYINT) || prestoType.equals(REAL) || prestoType.equals(StandardTypes.DOUBLE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stray space?

Copy link
Contributor

@ajaygeorge ajaygeorge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unneeded error handling from page source factories f7fae20 looks good % some comments.

@@ -109,12 +108,6 @@ public Optional<? extends ConnectorPageSource> createPageSource(
HiveFileContext hiveFileContext,
Optional<EncryptionInformation> encryptionInformation)
{
if (!columns.isEmpty() && columns.stream().allMatch(hiveColumnHandle -> hiveColumnHandle.getColumnType() == AGGREGATED)) {
throw new UnsupportedOperationException("Partial aggregation pushdown only supported for ORC/Parquet files. " +
Copy link
Contributor

@ajaygeorge ajaygeorge Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious. where does this check move after the refactoring. I wasn't able to find it. Is it not needed any more.?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tagged you where this check is moved to. instead of adding error handling for every file format, we do it all in one place. that's why it's not needed here anymore.

return pageSource.get();
}
}
throw new PrestoException(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajaygeorge this is where the check is moved to. If our columns are aggregated, we try to create an aggregatedPageSource by looping through all the aggregatedPageSourceFactories and returning when we get an aggregated page source (it's a weird way to do things, but it's how the selective and batch page sources work too), but if the file format doesn't support it (i.e. we finish looping through without returning), then we throw an exception.

ajaygeorge
ajaygeorge previously approved these changes Mar 4, 2024
Copy link
Contributor

@ajaygeorge ajaygeorge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest commits look good. LGTM

@@ -225,6 +245,39 @@ public ConnectorPageSource createPageSource(
throw new IllegalStateException("Could not find a file reader for split " + hiveSplit);
}

private ConnectorPageSource createAggregatedPageSource(Set<HiveAggregatedPageSourceFactory> aggregatedPageSourceFactories, Configuration configuration, ConnectorSession session, HiveSplit hiveSplit, HiveTableLayoutHandle hiveLayout, List<HiveColumnHandle> selectedColumns, HiveFileContext fileContext, Optional<EncryptionInformation> encryptionInformation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. arguments on separate lines for readability.

Improve error handling for partial aggregation pushdown and prevent
returning wrong results when footer stats should not be relied on.  This
covers the following cases:
1. Aggregations have been pushed down but partition file format does not
   support aggregation pushdown (can occur if table is declared with a
   supported storage format, but partition has a different storage format).
   Previously, page source providers for some file formats had special
   handling for this case, but not all
2. Always throw an exception if aggregations have been pushed down but
   partition footer stats are unreliable. Previously, if filter pushdown
   was enabled (used OrcSelectivePageSourceFactory), we wouldn't create an
   AggregatedPageSource, so you would get an error somewhere on read. If
   it was disabled (OrcBatchPageSourceFactory), we would create an
   AggregatedPageSource and the query would silently give wrong results.
3. Unexpected state where some but not all columns are of AGGREGATED
   type.

Error handling is still going to be reader dependent if both the table
and partition format support partial aggregation pushdown, but the
partition format does not support as many types (e.g. parquet vs. orc)
Remove error handling for aggregated columns from individual page source
factories, as these errors are now handled in a consolidated place.
This commit is separate from the main commit that consolidated the error
handling for easier review.
create a utility method so we can share the error handling code between
aggregated and batch page source factories.
@rschlussel rschlussel force-pushed the aggregation-pushdown-error-handling branch from a8c2a38 to ed7bb4b Compare March 5, 2024 14:35
Copy link
Contributor

@ajaygeorge ajaygeorge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@abhiseksaikia abhiseksaikia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Contributor

@ClarenceThreepwood ClarenceThreepwood left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@rschlussel rschlussel merged commit d80e49a into prestodb:master Mar 6, 2024
56 checks passed
DwrfEncryptionProvider dwrfEncryptionProvider,
boolean appendRowNumberEnabled)
{
OrcDataSource orcDataSource = getOrcDataSource(session, fileSplit, hdfsEnvironment, configuration, hiveFileContext, stats);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rschlussel this is resource leak because we don't close the orcDataSource in a happy case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh good catch. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants