Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve error handling for partial aggregation pushdown #22011

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_AND_TYPE_MANAGER;
import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_RESOLUTION;
import static com.facebook.presto.hive.HiveTestUtils.ROW_EXPRESSION_SERVICE;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveAggregatedPageSourceFactories;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveBatchPageSourceFactories;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveSelectivePageSourceFactories;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultS3HiveRecordCursorProvider;
Expand Down Expand Up @@ -199,6 +200,7 @@ public S3SelectTestHelper(String host,
getDefaultS3HiveRecordCursorProvider(config, metastoreClientConfig),
getDefaultHiveBatchPageSourceFactories(config, metastoreClientConfig),
getDefaultHiveSelectivePageSourceFactories(config, metastoreClientConfig),
getDefaultHiveAggregatedPageSourceFactories(config, metastoreClientConfig),
FUNCTION_AND_TYPE_MANAGER,
ROW_EXPRESSION_SERVICE);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.hive;

import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import org.apache.hadoop.conf.Configuration;

import java.util.List;
import java.util.Optional;

public interface HiveAggregatedPageSourceFactory
{
Optional<? extends ConnectorPageSource> createPageSource(
Configuration configuration,
ConnectorSession session,
HiveFileSplit fileSplit,
Storage storage,
List<HiveColumnHandle> columns,
HiveFileContext hiveFileContext,
Optional<EncryptionInformation> encryptionInformation,
boolean appendRowNumberEnabled);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
import com.facebook.presto.hive.metastore.HiveMetastoreCacheStats;
import com.facebook.presto.hive.metastore.HivePartitionMutator;
import com.facebook.presto.hive.metastore.MetastoreCacheStats;
import com.facebook.presto.hive.orc.DwrfAggregatedPageSourceFactory;
import com.facebook.presto.hive.orc.DwrfBatchPageSourceFactory;
import com.facebook.presto.hive.orc.DwrfSelectivePageSourceFactory;
import com.facebook.presto.hive.orc.OrcAggregatedPageSourceFactory;
import com.facebook.presto.hive.orc.OrcBatchPageSourceFactory;
import com.facebook.presto.hive.orc.OrcSelectivePageSourceFactory;
import com.facebook.presto.hive.orc.TupleDomainFilterCache;
import com.facebook.presto.hive.pagefile.PageFilePageSourceFactory;
import com.facebook.presto.hive.pagefile.PageFileWriterFactory;
import com.facebook.presto.hive.parquet.ParquetAggregatedPageSourceFactory;
import com.facebook.presto.hive.parquet.ParquetFileWriterFactory;
import com.facebook.presto.hive.parquet.ParquetPageSourceFactory;
import com.facebook.presto.hive.parquet.ParquetSelectivePageSourceFactory;
Expand Down Expand Up @@ -209,6 +212,11 @@ public void configure(Binder binder)
selectivePageSourceFactoryBinder.addBinding().to(DwrfSelectivePageSourceFactory.class).in(Scopes.SINGLETON);
selectivePageSourceFactoryBinder.addBinding().to(ParquetSelectivePageSourceFactory.class).in(Scopes.SINGLETON);

Multibinder<HiveAggregatedPageSourceFactory> aggregatedPageSourceFactoryBinder = newSetBinder(binder, HiveAggregatedPageSourceFactory.class);
aggregatedPageSourceFactoryBinder.addBinding().to(OrcAggregatedPageSourceFactory.class).in(Scopes.SINGLETON);
aggregatedPageSourceFactoryBinder.addBinding().to(DwrfAggregatedPageSourceFactory.class).in(Scopes.SINGLETON);
aggregatedPageSourceFactoryBinder.addBinding().to(ParquetAggregatedPageSourceFactory.class).in(Scopes.SINGLETON);

binder.bind(DataSinkFactory.class).to(OutputStreamDataSinkFactory.class).in(Scopes.SINGLETON);

Multibinder<HiveFileWriterFactory> fileWriterFactoryBinder = newSetBinder(binder, HiveFileWriterFactory.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import static com.facebook.presto.hive.HiveCoercer.createCoercer;
import static com.facebook.presto.hive.HiveColumnHandle.isPushedDownSubfield;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static com.facebook.presto.hive.HivePageSourceProvider.ColumnMapping.toColumnHandles;
import static com.facebook.presto.hive.HiveSessionProperties.isUseRecordPageSourceForCustomSplit;
import static com.facebook.presto.hive.HiveUtil.getPrefilledColumnValue;
Expand Down Expand Up @@ -97,6 +98,7 @@ public class HivePageSourceProvider
private final Set<HiveRecordCursorProvider> cursorProviders;
private final Set<HiveBatchPageSourceFactory> pageSourceFactories;
private final Set<HiveSelectivePageSourceFactory> selectivePageSourceFactories;
private Set<HiveAggregatedPageSourceFactory> aggregatedPageSourceFactories;
private final TypeManager typeManager;
private final RowExpressionService rowExpressionService;
private final LoadingCache<RowExpressionCacheKey, RowExpression> optimizedRowExpressionCache;
Expand All @@ -108,6 +110,7 @@ public HivePageSourceProvider(
Set<HiveRecordCursorProvider> cursorProviders,
Set<HiveBatchPageSourceFactory> pageSourceFactories,
Set<HiveSelectivePageSourceFactory> selectivePageSourceFactories,
Set<HiveAggregatedPageSourceFactory> aggregatedPageSourceFactories,
TypeManager typeManager,
RowExpressionService rowExpressionService)
{
Expand All @@ -117,6 +120,7 @@ public HivePageSourceProvider(
this.cursorProviders = ImmutableSet.copyOf(requireNonNull(cursorProviders, "cursorProviders is null"));
this.pageSourceFactories = ImmutableSet.copyOf(requireNonNull(pageSourceFactories, "pageSourceFactories is null"));
this.selectivePageSourceFactories = ImmutableSet.copyOf(requireNonNull(selectivePageSourceFactories, "selectivePageSourceFactories is null"));
this.aggregatedPageSourceFactories = ImmutableSet.copyOf(requireNonNull(aggregatedPageSourceFactories, "aggregatedPageSourceFactories is null"));
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
this.optimizedRowExpressionCache = CacheBuilder.newBuilder()
Expand Down Expand Up @@ -153,6 +157,30 @@ public ConnectorPageSource createPageSource(
path);

Optional<EncryptionInformation> encryptionInformation = hiveSplit.getEncryptionInformation();
CacheQuota cacheQuota = generateCacheQuota(hiveSplit);
HiveFileContext fileContext = new HiveFileContext(
splitContext.isCacheable(),
cacheQuota,
hiveSplit.getFileSplit().getExtraFileInfo().map(BinaryExtraHiveFileInfo::new),
OptionalLong.of(hiveSplit.getFileSplit().getFileSize()),
OptionalLong.of(hiveSplit.getFileSplit().getStart()),
OptionalLong.of(hiveSplit.getFileSplit().getLength()),
hiveSplit.getFileSplit().getFileModifiedTime(),
HiveSessionProperties.isVerboseRuntimeStatsEnabled(session));

if (columns.stream().anyMatch(columnHandle -> ((HiveColumnHandle) columnHandle).getColumnType().equals(AGGREGATED))) {
checkArgument(columns.stream().allMatch(columnHandle -> ((HiveColumnHandle) columnHandle).getColumnType().equals(AGGREGATED)), "Not all columns are of 'AGGREGATED' type");

if (hiveLayout.isFooterStatsUnreliable()) {
throw new PrestoException(HIVE_UNSUPPORTED_FORMAT, format("Partial aggregation pushdown is not supported when footer stats are unreliable. " +
"Table %s has file %s with unreliable footer stats. " +
"Set session property [catalog-name].pushdown_partial_aggregations_into_scan=false and execute query again.",
hiveLayout.getSchemaTableName(),
hiveSplit.getFileSplit().getPath()));
}

return createAggregatedPageSource(aggregatedPageSourceFactories, configuration, session, hiveSplit, hiveLayout, selectedColumns, fileContext, encryptionInformation);
}
if (hiveLayout.isPushdownFilterEnabled()) {
Optional<ConnectorPageSource> selectivePageSource = createSelectivePageSource(
selectivePageSourceFactories,
Expand All @@ -165,6 +193,7 @@ public ConnectorPageSource createPageSource(
typeManager,
optimizedRowExpressionCache,
splitContext,
fileContext,
encryptionInformation);
if (selectivePageSource.isPresent()) {
return selectivePageSource.get();
Expand All @@ -183,7 +212,6 @@ public ConnectorPageSource createPageSource(
return new HiveEmptySplitPageSource();
}

CacheQuota cacheQuota = generateCacheQuota(hiveSplit);
Optional<ConnectorPageSource> pageSource = createHivePageSource(
cursorProviders,
pageSourceFactories,
Expand All @@ -206,15 +234,7 @@ public ConnectorPageSource createPageSource(
hiveSplit.getTableToPartitionMapping(),
hiveSplit.getBucketConversion(),
hiveSplit.isS3SelectPushdownEnabled(),
new HiveFileContext(
splitContext.isCacheable(),
cacheQuota,
hiveSplit.getFileSplit().getExtraFileInfo().map(BinaryExtraHiveFileInfo::new),
OptionalLong.of(hiveSplit.getFileSplit().getFileSize()),
OptionalLong.of(hiveSplit.getFileSplit().getStart()),
OptionalLong.of(hiveSplit.getFileSplit().getLength()),
hiveSplit.getFileSplit().getFileModifiedTime(),
HiveSessionProperties.isVerboseRuntimeStatsEnabled(session)),
fileContext,
hiveLayout.getRemainingPredicate(),
hiveLayout.isPushdownFilterEnabled(),
rowExpressionService,
Expand All @@ -225,6 +245,47 @@ public ConnectorPageSource createPageSource(
throw new IllegalStateException("Could not find a file reader for split " + hiveSplit);
}

private ConnectorPageSource createAggregatedPageSource(
Set<HiveAggregatedPageSourceFactory> aggregatedPageSourceFactories,
Configuration configuration,
ConnectorSession session,
HiveSplit hiveSplit,
HiveTableLayoutHandle hiveLayout,
List<HiveColumnHandle> selectedColumns,
HiveFileContext fileContext,
Optional<EncryptionInformation> encryptionInformation)
{
for (HiveAggregatedPageSourceFactory pageSourceFactory : aggregatedPageSourceFactories) {
List<ColumnMapping> columnMappings = ColumnMapping.buildColumnMappings(
hiveSplit.getPartitionKeys(),
selectedColumns,
hiveSplit.getBucketConversion().map(BucketConversion::getBucketColumnHandles).orElse(ImmutableList.of()),
hiveSplit.getTableToPartitionMapping(),
hiveSplit.getFileSplit(),
hiveSplit.getTableBucketNumber());

List<ColumnMapping> regularAndInterimColumnMappings = ColumnMapping.extractRegularAndInterimColumnMappings(columnMappings);
Optional<? extends ConnectorPageSource> pageSource = pageSourceFactory.createPageSource(
configuration,
session,
hiveSplit.getFileSplit(),
hiveSplit.getStorage(),
toColumnHandles(regularAndInterimColumnMappings, true),
fileContext,
encryptionInformation,
hiveLayout.isAppendRowNumberEnabled());
if (pageSource.isPresent()) {
return pageSource.get();
}
}
throw new PrestoException(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajaygeorge this is where the check is moved to. If our columns are aggregated, we try to create an aggregatedPageSource by looping through all the aggregatedPageSourceFactories and returning when we get an aggregated page source (it's a weird way to do things, but it's how the selective and batch page sources work too), but if the file format doesn't support it (i.e. we finish looping through without returning), then we throw an exception.

HIVE_UNSUPPORTED_FORMAT,
format("Table %s has file of format %s that does not support partial aggregation pushdown. " +
"Set session property [catalog-name].pushdown_partial_aggregations_into_scan=false and execute query again.",
hiveLayout.getSchemaTableName(),
hiveSplit.getStorage().getStorageFormat().getSerDe()));
}

@VisibleForTesting
protected static CacheQuota generateCacheQuota(HiveSplit hiveSplit)
{
Expand Down Expand Up @@ -254,6 +315,7 @@ private static Optional<ConnectorPageSource> createSelectivePageSource(
TypeManager typeManager,
LoadingCache<RowExpressionCacheKey, RowExpression> rowExpressionCache,
SplitContext splitContext,
HiveFileContext fileContext,
Optional<EncryptionInformation> encryptionInformation)
{
Set<HiveColumnHandle> interimColumns = ImmutableSet.<HiveColumnHandle>builder()
Expand Down Expand Up @@ -302,7 +364,6 @@ private static Optional<ConnectorPageSource> createSelectivePageSource(
return Optional.of(new HiveEmptySplitPageSource());
}

CacheQuota cacheQuota = generateCacheQuota(split);
for (HiveSelectivePageSourceFactory pageSourceFactory : selectivePageSourceFactories) {
Optional<? extends ConnectorPageSource> pageSource = pageSourceFactory.createPageSource(
configuration,
Expand All @@ -318,18 +379,9 @@ private static Optional<ConnectorPageSource> createSelectivePageSource(
handle -> new Subfield(((HiveColumnHandle) handle).getName())).intersect(layout.getDomainPredicate())).orElse(layout.getDomainPredicate()),
optimizedRemainingPredicate,
hiveStorageTimeZone,
new HiveFileContext(
splitContext.isCacheable(),
cacheQuota,
split.getFileSplit().getExtraFileInfo().map(BinaryExtraHiveFileInfo::new),
OptionalLong.of(split.getFileSplit().getFileSize()),
OptionalLong.of(split.getFileSplit().getStart()),
OptionalLong.of(split.getFileSplit().getLength()),
split.getFileSplit().getFileModifiedTime(),
HiveSessionProperties.isVerboseRuntimeStatsEnabled(session)),
fileContext,
encryptionInformation,
layout.isAppendRowNumberEnabled(),
layout.isFooterStatsUnreliable());
ajaygeorge marked this conversation as resolved.
Show resolved Hide resolved
layout.isAppendRowNumberEnabled());
if (pageSource.isPresent()) {
return Optional.of(pageSource.get());
}
Expand Down Expand Up @@ -516,12 +568,6 @@ private static Optional<ConnectorPageSource> getPageSourceFromCursorProvider(
List<ColumnMapping> regularAndInterimColumnMappings,
Optional<BucketAdaptation> bucketAdaptation)
{
if (!hiveColumns.isEmpty() && hiveColumns.stream().allMatch(hiveColumnHandle -> hiveColumnHandle.getColumnType() == AGGREGATED)) {
throw new UnsupportedOperationException("Partial aggregation pushdown only supported for ORC/Parquet files. " +
"Table " + tableName.toString() + " has file (" + fileSplit.getPath() + ") of format " + storage.getStorageFormat().getOutputFormat() +
". Set session property hive.pushdown_partial_aggregations_into_scan=false and execute query again");
}

for (HiveRecordCursorProvider provider : cursorProviders) {
// GenericHiveRecordCursor will automatically do the coercion without HiveCoercionRecordCursor
boolean doCoercion = !(provider instanceof GenericHiveRecordCursorProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,5 @@ Optional<? extends ConnectorPageSource> createPageSource(
DateTimeZone hiveStorageTimeZone,
HiveFileContext hiveFileContext,
Optional<EncryptionInformation> encryptionInformation,
boolean appendRowNumberEnabled,
boolean footerStatsUnreliable);
boolean appendRowNumberEnabled);
}
Loading
Loading