From 2fae585f9fe4d2c2e7ceb55b1b6491468c890572 Mon Sep 17 00:00:00 2001 From: Rebecca Schlussel Date: Tue, 27 Feb 2024 10:13:15 -0500 Subject: [PATCH 1/5] Fix default invoker view test The test needs to drop the views at the end. --- .../facebook/presto/tests/AbstractTestDistributedQueries.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestDistributedQueries.java b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestDistributedQueries.java index 0f1f6af379e1f..57dd8a4e54ce2 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestDistributedQueries.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestDistributedQueries.java @@ -1132,6 +1132,8 @@ public void testViewAccessControlInvokerDefault() assertAccessAllowed( "SELECT * FROM test_view_access1", privilege(getSession().getUser(), "orders", SELECT_COLUMN)); + assertAccessAllowed(viewOwnerSession, "DROP VIEW test_view_access"); + assertAccessAllowed(viewOwnerSession, "DROP VIEW test_view_access1"); } @Test From 4c07688dffa1bf10e99960542ae4b7a754c36d53 Mon Sep 17 00:00:00 2001 From: Rebecca Schlussel Date: Mon, 4 Mar 2024 11:33:47 -0500 Subject: [PATCH 2/5] Refactor OrcPageSourceFactories to share code --- .../hive/orc/DwrfBatchPageSourceFactory.java | 11 +- .../hive/orc/OrcBatchPageSourceFactory.java | 90 +++---------- .../hive/orc/OrcPageSourceFactoryUtils.java | 126 ++++++++++++++++++ .../orc/OrcSelectivePageSourceFactory.java | 73 ++-------- .../presto/hive/TestHiveFileFormats.java | 10 +- .../TestOrcBatchPageSourceMemoryTracking.java | 2 - .../presto/hive/benchmark/FileFormat.java | 1 - 7 files changed, 163 insertions(+), 150 deletions(-) create mode 100644 presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcPageSourceFactoryUtils.java diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfBatchPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfBatchPageSourceFactory.java index ed9f10662bc56..4ff6ad7a8862d 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfBatchPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfBatchPageSourceFactory.java @@ -44,11 +44,8 @@ import java.util.Map; import java.util.Optional; -import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcLazyReadSmallRanges; -import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxBufferSize; import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxMergeDistance; import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxReadBlockSize; -import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcStreamBufferSize; import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcTinyStripeThreshold; import static com.facebook.presto.hive.HiveCommonSessionProperties.isOrcZstdJniDecompressionEnabled; import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA; @@ -114,7 +111,6 @@ public Optional createPageSource( return Optional.of(createOrcPageSource( DWRF, hdfsEnvironment, - session.getUser(), configuration, fileSplit, columns, @@ -122,10 +118,6 @@ public Optional createPageSource( effectivePredicate, hiveStorageTimeZone, typeManager, - functionResolution, - getOrcMaxBufferSize(session), - getOrcStreamBufferSize(session), - getOrcLazyReadSmallRanges(session), false, stats, domainCompactionThreshold, @@ -139,6 +131,7 @@ public Optional createPageSource( .withZstdJniDecompressionEnabled(isOrcZstdJniDecompressionEnabled(session)) .build(), encryptionInformation, - dwrfEncryptionProvider)); + dwrfEncryptionProvider, + session)); } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSourceFactory.java index 0083c96e04ce1..208869ec6b4e7 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSourceFactory.java @@ -27,11 +27,9 @@ import com.facebook.presto.hive.HiveOrcAggregatedMemoryContext; import com.facebook.presto.hive.metastore.Storage; import com.facebook.presto.orc.DwrfEncryptionProvider; -import com.facebook.presto.orc.DwrfKeyProvider; import com.facebook.presto.orc.OrcAggregatedMemoryContext; import com.facebook.presto.orc.OrcBatchRecordReader; import com.facebook.presto.orc.OrcDataSource; -import com.facebook.presto.orc.OrcDataSourceId; import com.facebook.presto.orc.OrcEncoding; import com.facebook.presto.orc.OrcPredicate; import com.facebook.presto.orc.OrcReader; @@ -43,21 +41,17 @@ import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.FixedPageSource; -import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import io.airlift.units.DataSize; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.joda.time.DateTimeZone; import javax.inject.Inject; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.List; import java.util.Map; @@ -65,30 +59,25 @@ import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.AGGREGATED; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; -import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcLazyReadSmallRanges; -import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxBufferSize; import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxMergeDistance; import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxReadBlockSize; -import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcStreamBufferSize; import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcTinyStripeThreshold; import static com.facebook.presto.hive.HiveCommonSessionProperties.isOrcBloomFiltersEnabled; import static com.facebook.presto.hive.HiveCommonSessionProperties.isOrcZstdJniDecompressionEnabled; -import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; -import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA; import static com.facebook.presto.hive.HiveUtil.getPhysicalHiveColumnHandles; +import static com.facebook.presto.hive.orc.OrcPageSourceFactoryUtils.getOrcDataSource; +import static com.facebook.presto.hive.orc.OrcPageSourceFactoryUtils.getOrcReader; +import static com.facebook.presto.hive.orc.OrcPageSourceFactoryUtils.mapToPrestoException; import static com.facebook.presto.orc.DwrfEncryptionProvider.NO_ENCRYPTION; import static com.facebook.presto.orc.OrcEncoding.ORC; import static com.facebook.presto.orc.OrcReader.INITIAL_BATCH_SIZE; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Strings.nullToEmpty; -import static java.lang.String.format; import static java.util.Objects.requireNonNull; public class OrcBatchPageSourceFactory implements HiveBatchPageSourceFactory { private final TypeManager typeManager; - private final StandardFunctionResolution functionResolution; private final boolean useOrcColumnNames; private final HdfsEnvironment hdfsEnvironment; private final FileFormatDataSourceStats stats; @@ -108,7 +97,6 @@ public OrcBatchPageSourceFactory( { this( typeManager, - functionResolution, requireNonNull(config, "hiveClientConfig is null").isUseOrcColumnNames(), hdfsEnvironment, stats, @@ -119,7 +107,6 @@ public OrcBatchPageSourceFactory( public OrcBatchPageSourceFactory( TypeManager typeManager, - StandardFunctionResolution functionResolution, boolean useOrcColumnNames, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats, @@ -128,7 +115,6 @@ public OrcBatchPageSourceFactory( StripeMetadataSourceFactory stripeMetadataSourceFactory) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); - this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); this.useOrcColumnNames = useOrcColumnNames; this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.stats = requireNonNull(stats, "stats is null"); @@ -163,7 +149,6 @@ public Optional createPageSource( return Optional.of(createOrcPageSource( ORC, hdfsEnvironment, - session.getUser(), configuration, fileSplit, columns, @@ -171,10 +156,6 @@ public Optional createPageSource( effectivePredicate, hiveStorageTimeZone, typeManager, - functionResolution, - getOrcMaxBufferSize(session), - getOrcStreamBufferSize(session), - getOrcLazyReadSmallRanges(session), isOrcBloomFiltersEnabled(session), stats, domainCompactionThreshold, @@ -188,13 +169,13 @@ public Optional createPageSource( .withZstdJniDecompressionEnabled(isOrcZstdJniDecompressionEnabled(session)) .build(), encryptionInformation, - NO_ENCRYPTION)); + NO_ENCRYPTION, + session)); } public static ConnectorPageSource createOrcPageSource( OrcEncoding orcEncoding, HdfsEnvironment hdfsEnvironment, - String sessionUser, Configuration configuration, HiveFileSplit fileSplit, List columns, @@ -202,10 +183,6 @@ public static ConnectorPageSource createOrcPageSource( TupleDomain effectivePredicate, DateTimeZone hiveStorageTimeZone, TypeManager typeManager, - StandardFunctionResolution functionResolution, - DataSize maxBufferSize, - DataSize streamBufferSize, - boolean lazyReadSmallRanges, boolean orcBloomFiltersEnabled, FileFormatDataSourceStats stats, int domainCompactionThreshold, @@ -214,47 +191,28 @@ public static ConnectorPageSource createOrcPageSource( HiveFileContext hiveFileContext, OrcReaderOptions orcReaderOptions, Optional encryptionInformation, - DwrfEncryptionProvider dwrfEncryptionProvider) + DwrfEncryptionProvider dwrfEncryptionProvider, + ConnectorSession session) { checkArgument(domainCompactionThreshold >= 1, "domainCompactionThreshold must be at least 1"); - OrcDataSource orcDataSource; + OrcDataSource orcDataSource = getOrcDataSource(session, fileSplit, hdfsEnvironment, configuration, hiveFileContext, stats); Path path = new Path(fileSplit.getPath()); - try { - FSDataInputStream inputStream = hdfsEnvironment.getFileSystem(sessionUser, path, configuration).openFile(path, hiveFileContext); - - orcDataSource = new HdfsOrcDataSource( - new OrcDataSourceId(fileSplit.getPath()), - fileSplit.getFileSize(), - orcReaderOptions.getMaxMergeDistance(), - maxBufferSize, - streamBufferSize, - lazyReadSmallRanges, - inputStream, - stats); - } - catch (Exception e) { - if (nullToEmpty(e.getMessage()).trim().equals("Filesystem closed") || - e instanceof FileNotFoundException) { - throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, e); - } - throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, splitError(e, fileSplit), e); - } OrcAggregatedMemoryContext systemMemoryUsage = new HiveOrcAggregatedMemoryContext(); try { - DwrfKeyProvider dwrfKeyProvider = new ProjectionBasedDwrfKeyProvider(encryptionInformation, columns, useOrcColumnNames, path); - OrcReader reader = new OrcReader( - orcDataSource, + OrcReader reader = getOrcReader( orcEncoding, + columns, + useOrcColumnNames, orcFileTailSource, stripeMetadataSourceFactory, - new HiveOrcAggregatedMemoryContext(), + hiveFileContext, orcReaderOptions, - hiveFileContext.isCacheable(), + encryptionInformation, dwrfEncryptionProvider, - dwrfKeyProvider, - hiveFileContext.getStats()); + orcDataSource, + path); List physicalColumns = getPhysicalHiveColumnHandles(columns, useOrcColumnNames, reader.getTypes(), path); ImmutableMap.Builder includedColumns = ImmutableMap.builder(); @@ -297,23 +255,7 @@ public static ConnectorPageSource createOrcPageSource( } catch (IOException ignored) { } - if (e instanceof PrestoException) { - throw (PrestoException) e; - } - String message = splitError(e, fileSplit); - if (e.getClass().getSimpleName().equals("BlockMissingException")) { - throw new PrestoException(HIVE_MISSING_DATA, message, e); - } - throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, message, e); + throw mapToPrestoException(e, path, fileSplit); } } - - private static String splitError(Throwable t, HiveFileSplit fileSplit) - { - return format("Error opening Hive split %s (offset=%s, length=%s): %s", - fileSplit.getPath(), - fileSplit.getStart(), - fileSplit.getLength(), - t.getMessage()); - } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcPageSourceFactoryUtils.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcPageSourceFactoryUtils.java new file mode 100644 index 0000000000000..1e3bae934eb12 --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcPageSourceFactoryUtils.java @@ -0,0 +1,126 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.hive.orc; + +import com.facebook.presto.hive.EncryptionInformation; +import com.facebook.presto.hive.FileFormatDataSourceStats; +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveColumnHandle; +import com.facebook.presto.hive.HiveFileContext; +import com.facebook.presto.hive.HiveFileSplit; +import com.facebook.presto.hive.HiveOrcAggregatedMemoryContext; +import com.facebook.presto.orc.DwrfEncryptionProvider; +import com.facebook.presto.orc.DwrfKeyProvider; +import com.facebook.presto.orc.OrcDataSource; +import com.facebook.presto.orc.OrcDataSourceId; +import com.facebook.presto.orc.OrcEncoding; +import com.facebook.presto.orc.OrcReader; +import com.facebook.presto.orc.OrcReaderOptions; +import com.facebook.presto.orc.StripeMetadataSourceFactory; +import com.facebook.presto.orc.cache.OrcFileTailSource; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.PrestoException; +import com.google.common.util.concurrent.UncheckedExecutionException; +import io.airlift.units.DataSize; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcLazyReadSmallRanges; +import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxBufferSize; +import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxMergeDistance; +import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcStreamBufferSize; +import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; +import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA; +import static com.google.common.base.Strings.nullToEmpty; +import static java.lang.String.format; + +public class OrcPageSourceFactoryUtils +{ + private OrcPageSourceFactoryUtils() {} + + public static HdfsOrcDataSource getOrcDataSource(ConnectorSession session, HiveFileSplit fileSplit, HdfsEnvironment hdfsEnvironment, Configuration configuration, HiveFileContext hiveFileContext, FileFormatDataSourceStats stats) + { + DataSize maxMergeDistance = getOrcMaxMergeDistance(session); + DataSize maxBufferSize = getOrcMaxBufferSize(session); + DataSize streamBufferSize = getOrcStreamBufferSize(session); + boolean lazyReadSmallRanges = getOrcLazyReadSmallRanges(session); + + Path path = new Path(fileSplit.getPath()); + try { + FSDataInputStream inputStream = hdfsEnvironment.getFileSystem(session.getUser(), path, configuration).openFile(path, hiveFileContext); + return new HdfsOrcDataSource( + new OrcDataSourceId(fileSplit.getPath()), + fileSplit.getFileSize(), + maxMergeDistance, + maxBufferSize, + streamBufferSize, + lazyReadSmallRanges, + inputStream, + stats); + } + catch (Exception e) { + if (nullToEmpty(e.getMessage()).trim().equals("Filesystem closed") || + e instanceof FileNotFoundException) { + throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, e); + } + throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, splitError(e, path, fileSplit.getStart(), fileSplit.getLength()), e); + } + } + + private static String splitError(Throwable t, Path path, long start, long length) + { + return format("Error opening Hive split %s (offset=%s, length=%s): %s", path, start, length, t.getMessage()); + } + + public static OrcReader getOrcReader(OrcEncoding orcEncoding, List columns, boolean useOrcColumnNames, OrcFileTailSource orcFileTailSource, StripeMetadataSourceFactory stripeMetadataSourceFactory, HiveFileContext hiveFileContext, OrcReaderOptions orcReaderOptions, Optional encryptionInformation, DwrfEncryptionProvider dwrfEncryptionProvider, OrcDataSource orcDataSource, Path path) + throws IOException + { + DwrfKeyProvider dwrfKeyProvider = new ProjectionBasedDwrfKeyProvider(encryptionInformation, columns, useOrcColumnNames, path); + OrcReader reader = new OrcReader( + orcDataSource, + orcEncoding, + orcFileTailSource, + stripeMetadataSourceFactory, + new HiveOrcAggregatedMemoryContext(), + orcReaderOptions, + hiveFileContext.isCacheable(), + dwrfEncryptionProvider, + dwrfKeyProvider, + hiveFileContext.getStats()); + return reader; + } + + public static PrestoException mapToPrestoException(Exception e, Path path, HiveFileSplit fileSplit) + { + // instanceof and class comparison do not work here since they are loaded by different class loaders. + if (e.getClass().getName().equals(UncheckedExecutionException.class.getName()) && e.getCause() instanceof PrestoException) { + return (PrestoException) e.getCause(); + } + if (e instanceof PrestoException) { + return (PrestoException) e; + } + String message = splitError(e, path, fileSplit.getStart(), fileSplit.getLength()); + if (e.getClass().getSimpleName().equals("BlockMissingException")) { + return new PrestoException(HIVE_MISSING_DATA, message, e); + } + return new PrestoException(HIVE_CANNOT_OPEN_SPLIT, message, e); + } +} diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSourceFactory.java index e0f257b7441ee..fce496f2fec64 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSourceFactory.java @@ -39,10 +39,8 @@ import com.facebook.presto.hive.SubfieldExtractor; import com.facebook.presto.hive.metastore.Storage; import com.facebook.presto.orc.DwrfEncryptionProvider; -import com.facebook.presto.orc.DwrfKeyProvider; import com.facebook.presto.orc.OrcAggregatedMemoryContext; import com.facebook.presto.orc.OrcDataSource; -import com.facebook.presto.orc.OrcDataSourceId; import com.facebook.presto.orc.OrcEncoding; import com.facebook.presto.orc.OrcPredicate; import com.facebook.presto.orc.OrcReader; @@ -70,10 +68,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; -import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.units.DataSize; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; @@ -81,7 +77,6 @@ import javax.inject.Inject; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -106,26 +101,23 @@ import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.AGGREGATED; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; import static com.facebook.presto.hive.HiveBucketing.getHiveBucket; -import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcLazyReadSmallRanges; -import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxBufferSize; import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxMergeDistance; import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxReadBlockSize; -import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcStreamBufferSize; import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcTinyStripeThreshold; import static com.facebook.presto.hive.HiveCommonSessionProperties.isOrcBloomFiltersEnabled; import static com.facebook.presto.hive.HiveCommonSessionProperties.isOrcZstdJniDecompressionEnabled; -import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES; -import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA; import static com.facebook.presto.hive.HiveSessionProperties.isAdaptiveFilterReorderingEnabled; import static com.facebook.presto.hive.HiveUtil.getPhysicalHiveColumnHandles; import static com.facebook.presto.hive.HiveUtil.typedPartitionKey; +import static com.facebook.presto.hive.orc.OrcPageSourceFactoryUtils.getOrcDataSource; +import static com.facebook.presto.hive.orc.OrcPageSourceFactoryUtils.getOrcReader; +import static com.facebook.presto.hive.orc.OrcPageSourceFactoryUtils.mapToPrestoException; import static com.facebook.presto.orc.DwrfEncryptionProvider.NO_ENCRYPTION; import static com.facebook.presto.orc.OrcEncoding.ORC; import static com.facebook.presto.orc.OrcReader.INITIAL_BATCH_SIZE; import static com.facebook.presto.spi.relation.SpecialFormExpression.Form.AND; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Strings.nullToEmpty; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableBiMap.toImmutableBiMap; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -288,9 +280,10 @@ public static ConnectorPageSource createOrcPageSource( { checkArgument(domainCompactionThreshold >= 1, "domainCompactionThreshold must be at least 1"); + OrcDataSource orcDataSource = getOrcDataSource(session, fileSplit, hdfsEnvironment, configuration, hiveFileContext, stats); + Path path = new Path(fileSplit.getPath()); + DataSize maxMergeDistance = getOrcMaxMergeDistance(session); - DataSize maxBufferSize = getOrcMaxBufferSize(session); - DataSize streamBufferSize = getOrcStreamBufferSize(session); DataSize tinyStripeThreshold = getOrcTinyStripeThreshold(session); DataSize maxReadBlockSize = getOrcMaxReadBlockSize(session); OrcReaderOptions orcReaderOptions = OrcReaderOptions.builder() @@ -300,49 +293,22 @@ public static ConnectorPageSource createOrcPageSource( .withZstdJniDecompressionEnabled(isOrcZstdJniDecompressionEnabled(session)) .withAppendRowNumber(appendRowNumberEnabled) .build(); - boolean lazyReadSmallRanges = getOrcLazyReadSmallRanges(session); - - OrcDataSource orcDataSource; - Path path = new Path(fileSplit.getPath()); - try { - FSDataInputStream inputStream = hdfsEnvironment.getFileSystem(session.getUser(), path, configuration).openFile(path, hiveFileContext); - orcDataSource = new HdfsOrcDataSource( - new OrcDataSourceId(fileSplit.getPath()), - fileSplit.getFileSize(), - maxMergeDistance, - maxBufferSize, - streamBufferSize, - lazyReadSmallRanges, - inputStream, - stats); - } - catch (PrestoException e) { - throw e; - } - catch (Exception e) { - if (nullToEmpty(e.getMessage()).trim().equals("Filesystem closed") || - e instanceof FileNotFoundException) { - throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, e); - } - throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, splitError(e, path, fileSplit.getStart(), fileSplit.getLength()), e); - } - OrcAggregatedMemoryContext systemMemoryUsage = new HiveOrcAggregatedMemoryContext(); try { checkArgument(!domainPredicate.isNone(), "Unexpected NONE domain"); - DwrfKeyProvider dwrfKeyProvider = new ProjectionBasedDwrfKeyProvider(encryptionInformation, columns, useOrcColumnNames, path); - OrcReader reader = new OrcReader( - orcDataSource, + OrcReader reader = getOrcReader( orcEncoding, + columns, + useOrcColumnNames, orcFileTailSource, stripeMetadataSourceFactory, - systemMemoryUsage, + hiveFileContext, orcReaderOptions, - hiveFileContext.isCacheable(), + encryptionInformation, dwrfEncryptionProvider, - dwrfKeyProvider, - hiveFileContext.getStats()); + orcDataSource, + path); List physicalColumns = getPhysicalHiveColumnHandles(columns, useOrcColumnNames, reader.getTypes(), path); @@ -427,18 +393,7 @@ public static ConnectorPageSource createOrcPageSource( } catch (IOException ignored) { } - // instanceof and class comparison do not work here since they are loaded by different class loaders. - if (e.getClass().getName().equals(UncheckedExecutionException.class.getName()) && e.getCause() instanceof PrestoException) { - throw (PrestoException) e.getCause(); - } - if (e instanceof PrestoException) { - throw (PrestoException) e; - } - String message = splitError(e, path, fileSplit.getStart(), fileSplit.getLength()); - if (e.getClass().getSimpleName().equals("BlockMissingException")) { - throw new PrestoException(HIVE_MISSING_DATA, message, e); - } - throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, message, e); + throw mapToPrestoException(e, path, fileSplit); } } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java index 9d5e5ca6c8745..74bac95c65b4b 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java @@ -329,7 +329,7 @@ public void testOrc(int rowCount) assertThatFileFormat(ORC) .withColumns(TEST_COLUMNS) .withRowsCount(rowCount) - .isReadableByPageSource(new OrcBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, false, HDFS_ENVIRONMENT, STATS, 100, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource()))); + .isReadableByPageSource(new OrcBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, false, HDFS_ENVIRONMENT, STATS, 100, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource()))); } @Test(dataProvider = "rowCount") @@ -352,7 +352,7 @@ public void testOrcOptimizedWriter(int rowCount) .withSession(session) .withFileWriterFactory(new OrcFileWriterFactory(HDFS_ENVIRONMENT, new OutputStreamDataSinkFactory(), FUNCTION_AND_TYPE_MANAGER, new NodeVersion("test"), HIVE_STORAGE_TIME_ZONE, STATS, new OrcFileWriterConfig(), NO_ENCRYPTION)) .isReadableByRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT)) - .isReadableByPageSource(new OrcBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, false, HDFS_ENVIRONMENT, STATS, 100, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource()))); + .isReadableByPageSource(new OrcBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, false, HDFS_ENVIRONMENT, STATS, 100, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource()))); } @Test(dataProvider = "rowCount") @@ -391,7 +391,7 @@ public void testOrcUseColumnNames(int rowCount) .withRowsCount(rowCount) .withReadColumns(Lists.reverse(TEST_COLUMNS)) .withSession(session) - .isReadableByPageSource(new OrcBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, true, HDFS_ENVIRONMENT, STATS, 100, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource()))); + .isReadableByPageSource(new OrcBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, true, HDFS_ENVIRONMENT, STATS, 100, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource()))); } @Test(dataProvider = "rowCount") @@ -545,7 +545,7 @@ public void testTruncateVarcharColumn() assertThatFileFormat(ORC) .withWriteColumns(ImmutableList.of(writeColumn)) .withReadColumns(ImmutableList.of(readColumn)) - .isReadableByPageSource(new OrcBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, false, HDFS_ENVIRONMENT, STATS, 100, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource()))); + .isReadableByPageSource(new OrcBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, false, HDFS_ENVIRONMENT, STATS, 100, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource()))); assertThatFileFormat(PARQUET) .withWriteColumns(ImmutableList.of(writeColumn)) @@ -593,7 +593,7 @@ public void testFailForLongVarcharPartitionColumn() assertThatFileFormat(ORC) .withColumns(columns) - .isFailingForPageSource(new OrcBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, false, HDFS_ENVIRONMENT, STATS, 100, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource())), expectedErrorCode, expectedMessage); + .isFailingForPageSource(new OrcBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, false, HDFS_ENVIRONMENT, STATS, 100, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource())), expectedErrorCode, expectedMessage); assertThatFileFormat(PARQUET) .withColumns(columns) diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestOrcBatchPageSourceMemoryTracking.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestOrcBatchPageSourceMemoryTracking.java index 7cfbcf738062a..889b1023d7dc0 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestOrcBatchPageSourceMemoryTracking.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestOrcBatchPageSourceMemoryTracking.java @@ -107,7 +107,6 @@ import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; import static com.facebook.presto.hive.HiveFileContext.DEFAULT_HIVE_FILE_CONTEXT; import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_AND_TYPE_MANAGER; -import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_RESOLUTION; import static com.facebook.presto.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static com.facebook.presto.hive.HiveTestUtils.ROW_EXPRESSION_SERVICE; import static com.facebook.presto.hive.HiveTestUtils.SESSION; @@ -456,7 +455,6 @@ public ConnectorPageSource newPageSource(FileFormatDataSourceStats stats, Connec OrcBatchPageSourceFactory orcPageSourceFactory = new OrcBatchPageSourceFactory( FUNCTION_AND_TYPE_MANAGER, - FUNCTION_RESOLUTION, false, HDFS_ENVIRONMENT, stats, diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/benchmark/FileFormat.java b/presto-hive/src/test/java/com/facebook/presto/hive/benchmark/FileFormat.java index 53ed9ef765875..44fe42c5e37bb 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/benchmark/FileFormat.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/benchmark/FileFormat.java @@ -161,7 +161,6 @@ public ConnectorPageSource createFileFormatReader(ConnectorSession session, Hdfs { HiveBatchPageSourceFactory pageSourceFactory = new OrcBatchPageSourceFactory( FUNCTION_AND_TYPE_MANAGER, - FUNCTION_RESOLUTION, false, hdfsEnvironment, new FileFormatDataSourceStats(), From 835b0471cce61fa34f8ed518579bda645e7b6c83 Mon Sep 17 00:00:00 2001 From: Rebecca Schlussel Date: Thu, 22 Feb 2024 16:48:46 -0500 Subject: [PATCH 3/5] Improve error handling for partial aggregation pushdown Improve error handling for partial aggregation pushdown and prevent returning wrong results when footer stats should not be relied on. This covers the following cases: 1. Aggregations have been pushed down but partition file format does not support aggregation pushdown (can occur if table is declared with a supported storage format, but partition has a different storage format). Previously, page source providers for some file formats had special handling for this case, but not all 2. Always throw an exception if aggregations have been pushed down but partition footer stats are unreliable. Previously, if filter pushdown was enabled (used OrcSelectivePageSourceFactory), we wouldn't create an AggregatedPageSource, so you would get an error somewhere on read. If it was disabled (OrcBatchPageSourceFactory), we would create an AggregatedPageSource and the query would silently give wrong results. 3. Unexpected state where some but not all columns are of AGGREGATED type. Error handling is still going to be reader dependent if both the table and partition format support partial aggregation pushdown, but the partition format does not support as many types (e.g. parquet vs. orc) --- .../hive/s3select/S3SelectTestHelper.java | 2 + .../hive/HiveAggregatedPageSourceFactory.java | 36 +++ .../presto/hive/HiveClientModule.java | 8 + .../presto/hive/HivePageSourceProvider.java | 99 ++++-- .../orc/DwrfAggregatedPageSourceFactory.java | 132 ++++++++ .../orc/OrcAggregatedPageSourceFactory.java | 209 +++++++++++++ .../ParquetAggregatedPageSourceFactory.java | 182 +++++++++++ .../presto/hive/AbstractTestHiveClient.java | 11 +- .../hive/AbstractTestHiveFileSystem.java | 6 +- .../facebook/presto/hive/HiveTestUtils.java | 14 + .../presto/hive/TestDynamicPruning.java | 10 +- .../presto/hive/TestHivePageSink.java | 11 +- .../hive/TestHivePageSourceProvider.java | 291 ++++++++++++++++++ .../hive/TestHivePushdownFilterQueries.java | 3 +- 14 files changed, 982 insertions(+), 32 deletions(-) create mode 100644 presto-hive/src/main/java/com/facebook/presto/hive/HiveAggregatedPageSourceFactory.java create mode 100644 presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfAggregatedPageSourceFactory.java create mode 100644 presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcAggregatedPageSourceFactory.java create mode 100644 presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetAggregatedPageSourceFactory.java diff --git a/presto-hive-hadoop2/src/test/java/com/facebook/presto/hive/s3select/S3SelectTestHelper.java b/presto-hive-hadoop2/src/test/java/com/facebook/presto/hive/s3select/S3SelectTestHelper.java index 001d5b6bb2942..7399b4cc242ee 100644 --- a/presto-hive-hadoop2/src/test/java/com/facebook/presto/hive/s3select/S3SelectTestHelper.java +++ b/presto-hive-hadoop2/src/test/java/com/facebook/presto/hive/s3select/S3SelectTestHelper.java @@ -83,6 +83,7 @@ import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_AND_TYPE_MANAGER; import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_RESOLUTION; import static com.facebook.presto.hive.HiveTestUtils.ROW_EXPRESSION_SERVICE; +import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveAggregatedPageSourceFactories; import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveBatchPageSourceFactories; import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveSelectivePageSourceFactories; import static com.facebook.presto.hive.HiveTestUtils.getDefaultS3HiveRecordCursorProvider; @@ -199,6 +200,7 @@ public S3SelectTestHelper(String host, getDefaultS3HiveRecordCursorProvider(config, metastoreClientConfig), getDefaultHiveBatchPageSourceFactories(config, metastoreClientConfig), getDefaultHiveSelectivePageSourceFactories(config, metastoreClientConfig), + getDefaultHiveAggregatedPageSourceFactories(config, metastoreClientConfig), FUNCTION_AND_TYPE_MANAGER, ROW_EXPRESSION_SERVICE); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveAggregatedPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveAggregatedPageSourceFactory.java new file mode 100644 index 0000000000000..99fc7ec4c1026 --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveAggregatedPageSourceFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.hive; + +import com.facebook.presto.hive.metastore.Storage; +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.ConnectorSession; +import org.apache.hadoop.conf.Configuration; + +import java.util.List; +import java.util.Optional; + +public interface HiveAggregatedPageSourceFactory +{ + Optional createPageSource( + Configuration configuration, + ConnectorSession session, + HiveFileSplit fileSplit, + Storage storage, + List columns, + HiveFileContext hiveFileContext, + Optional encryptionInformation, + boolean appendRowNumberEnabled); +} diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java index 8c31e8893a74d..a291b0bf4ce0c 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java @@ -27,13 +27,16 @@ import com.facebook.presto.hive.metastore.HiveMetastoreCacheStats; import com.facebook.presto.hive.metastore.HivePartitionMutator; import com.facebook.presto.hive.metastore.MetastoreCacheStats; +import com.facebook.presto.hive.orc.DwrfAggregatedPageSourceFactory; import com.facebook.presto.hive.orc.DwrfBatchPageSourceFactory; import com.facebook.presto.hive.orc.DwrfSelectivePageSourceFactory; +import com.facebook.presto.hive.orc.OrcAggregatedPageSourceFactory; import com.facebook.presto.hive.orc.OrcBatchPageSourceFactory; import com.facebook.presto.hive.orc.OrcSelectivePageSourceFactory; import com.facebook.presto.hive.orc.TupleDomainFilterCache; import com.facebook.presto.hive.pagefile.PageFilePageSourceFactory; import com.facebook.presto.hive.pagefile.PageFileWriterFactory; +import com.facebook.presto.hive.parquet.ParquetAggregatedPageSourceFactory; import com.facebook.presto.hive.parquet.ParquetFileWriterFactory; import com.facebook.presto.hive.parquet.ParquetPageSourceFactory; import com.facebook.presto.hive.parquet.ParquetSelectivePageSourceFactory; @@ -209,6 +212,11 @@ public void configure(Binder binder) selectivePageSourceFactoryBinder.addBinding().to(DwrfSelectivePageSourceFactory.class).in(Scopes.SINGLETON); selectivePageSourceFactoryBinder.addBinding().to(ParquetSelectivePageSourceFactory.class).in(Scopes.SINGLETON); + Multibinder aggregatedPageSourceFactoryBinder = newSetBinder(binder, HiveAggregatedPageSourceFactory.class); + aggregatedPageSourceFactoryBinder.addBinding().to(OrcAggregatedPageSourceFactory.class).in(Scopes.SINGLETON); + aggregatedPageSourceFactoryBinder.addBinding().to(DwrfAggregatedPageSourceFactory.class).in(Scopes.SINGLETON); + aggregatedPageSourceFactoryBinder.addBinding().to(ParquetAggregatedPageSourceFactory.class).in(Scopes.SINGLETON); + binder.bind(DataSinkFactory.class).to(OutputStreamDataSinkFactory.class).in(Scopes.SINGLETON); Multibinder fileWriterFactoryBinder = newSetBinder(binder, HiveFileWriterFactory.class); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java index 292bb240df5f0..1613cdf572a4c 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java @@ -69,6 +69,7 @@ import static com.facebook.presto.hive.HiveCoercer.createCoercer; import static com.facebook.presto.hive.HiveColumnHandle.isPushedDownSubfield; import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR; +import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT; import static com.facebook.presto.hive.HivePageSourceProvider.ColumnMapping.toColumnHandles; import static com.facebook.presto.hive.HiveSessionProperties.isUseRecordPageSourceForCustomSplit; import static com.facebook.presto.hive.HiveUtil.getPrefilledColumnValue; @@ -97,6 +98,7 @@ public class HivePageSourceProvider private final Set cursorProviders; private final Set pageSourceFactories; private final Set selectivePageSourceFactories; + private Set aggregatedPageSourceFactories; private final TypeManager typeManager; private final RowExpressionService rowExpressionService; private final LoadingCache optimizedRowExpressionCache; @@ -108,6 +110,7 @@ public HivePageSourceProvider( Set cursorProviders, Set pageSourceFactories, Set selectivePageSourceFactories, + Set aggregatedPageSourceFactories, TypeManager typeManager, RowExpressionService rowExpressionService) { @@ -117,6 +120,7 @@ public HivePageSourceProvider( this.cursorProviders = ImmutableSet.copyOf(requireNonNull(cursorProviders, "cursorProviders is null")); this.pageSourceFactories = ImmutableSet.copyOf(requireNonNull(pageSourceFactories, "pageSourceFactories is null")); this.selectivePageSourceFactories = ImmutableSet.copyOf(requireNonNull(selectivePageSourceFactories, "selectivePageSourceFactories is null")); + this.aggregatedPageSourceFactories = ImmutableSet.copyOf(requireNonNull(aggregatedPageSourceFactories, "aggregatedPageSourceFactories is null")); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null"); this.optimizedRowExpressionCache = CacheBuilder.newBuilder() @@ -153,6 +157,30 @@ public ConnectorPageSource createPageSource( path); Optional encryptionInformation = hiveSplit.getEncryptionInformation(); + CacheQuota cacheQuota = generateCacheQuota(hiveSplit); + HiveFileContext fileContext = new HiveFileContext( + splitContext.isCacheable(), + cacheQuota, + hiveSplit.getFileSplit().getExtraFileInfo().map(BinaryExtraHiveFileInfo::new), + OptionalLong.of(hiveSplit.getFileSplit().getFileSize()), + OptionalLong.of(hiveSplit.getFileSplit().getStart()), + OptionalLong.of(hiveSplit.getFileSplit().getLength()), + hiveSplit.getFileSplit().getFileModifiedTime(), + HiveSessionProperties.isVerboseRuntimeStatsEnabled(session)); + + if (columns.stream().anyMatch(columnHandle -> ((HiveColumnHandle) columnHandle).getColumnType().equals(AGGREGATED))) { + checkArgument(columns.stream().allMatch(columnHandle -> ((HiveColumnHandle) columnHandle).getColumnType().equals(AGGREGATED)), "Not all columns are of 'AGGREGATED' type"); + + if (hiveLayout.isFooterStatsUnreliable()) { + throw new PrestoException(HIVE_UNSUPPORTED_FORMAT, format("Partial aggregation pushdown is not supported when footer stats are unreliable. " + + "Table %s has file %s with unreliable footer stats. " + + "Set session property [catalog-name].pushdown_partial_aggregations_into_scan=false and execute query again.", + hiveLayout.getSchemaTableName(), + hiveSplit.getFileSplit().getPath())); + } + + return createAggregatedPageSource(aggregatedPageSourceFactories, configuration, session, hiveSplit, hiveLayout, selectedColumns, fileContext, encryptionInformation); + } if (hiveLayout.isPushdownFilterEnabled()) { Optional selectivePageSource = createSelectivePageSource( selectivePageSourceFactories, @@ -165,6 +193,7 @@ public ConnectorPageSource createPageSource( typeManager, optimizedRowExpressionCache, splitContext, + fileContext, encryptionInformation); if (selectivePageSource.isPresent()) { return selectivePageSource.get(); @@ -183,7 +212,6 @@ public ConnectorPageSource createPageSource( return new HiveEmptySplitPageSource(); } - CacheQuota cacheQuota = generateCacheQuota(hiveSplit); Optional pageSource = createHivePageSource( cursorProviders, pageSourceFactories, @@ -206,15 +234,7 @@ public ConnectorPageSource createPageSource( hiveSplit.getTableToPartitionMapping(), hiveSplit.getBucketConversion(), hiveSplit.isS3SelectPushdownEnabled(), - new HiveFileContext( - splitContext.isCacheable(), - cacheQuota, - hiveSplit.getFileSplit().getExtraFileInfo().map(BinaryExtraHiveFileInfo::new), - OptionalLong.of(hiveSplit.getFileSplit().getFileSize()), - OptionalLong.of(hiveSplit.getFileSplit().getStart()), - OptionalLong.of(hiveSplit.getFileSplit().getLength()), - hiveSplit.getFileSplit().getFileModifiedTime(), - HiveSessionProperties.isVerboseRuntimeStatsEnabled(session)), + fileContext, hiveLayout.getRemainingPredicate(), hiveLayout.isPushdownFilterEnabled(), rowExpressionService, @@ -225,6 +245,47 @@ public ConnectorPageSource createPageSource( throw new IllegalStateException("Could not find a file reader for split " + hiveSplit); } + private ConnectorPageSource createAggregatedPageSource( + Set aggregatedPageSourceFactories, + Configuration configuration, + ConnectorSession session, + HiveSplit hiveSplit, + HiveTableLayoutHandle hiveLayout, + List selectedColumns, + HiveFileContext fileContext, + Optional encryptionInformation) + { + for (HiveAggregatedPageSourceFactory pageSourceFactory : aggregatedPageSourceFactories) { + List columnMappings = ColumnMapping.buildColumnMappings( + hiveSplit.getPartitionKeys(), + selectedColumns, + hiveSplit.getBucketConversion().map(BucketConversion::getBucketColumnHandles).orElse(ImmutableList.of()), + hiveSplit.getTableToPartitionMapping(), + hiveSplit.getFileSplit(), + hiveSplit.getTableBucketNumber()); + + List regularAndInterimColumnMappings = ColumnMapping.extractRegularAndInterimColumnMappings(columnMappings); + Optional pageSource = pageSourceFactory.createPageSource( + configuration, + session, + hiveSplit.getFileSplit(), + hiveSplit.getStorage(), + toColumnHandles(regularAndInterimColumnMappings, true), + fileContext, + encryptionInformation, + hiveLayout.isAppendRowNumberEnabled()); + if (pageSource.isPresent()) { + return pageSource.get(); + } + } + throw new PrestoException( + HIVE_UNSUPPORTED_FORMAT, + format("Table %s has file of format %s that does not support partial aggregation pushdown. " + + "Set session property [catalog-name].pushdown_partial_aggregations_into_scan=false and execute query again.", + hiveLayout.getSchemaTableName(), + hiveSplit.getStorage().getStorageFormat().getSerDe())); + } + @VisibleForTesting protected static CacheQuota generateCacheQuota(HiveSplit hiveSplit) { @@ -254,6 +315,7 @@ private static Optional createSelectivePageSource( TypeManager typeManager, LoadingCache rowExpressionCache, SplitContext splitContext, + HiveFileContext fileContext, Optional encryptionInformation) { Set interimColumns = ImmutableSet.builder() @@ -302,7 +364,6 @@ private static Optional createSelectivePageSource( return Optional.of(new HiveEmptySplitPageSource()); } - CacheQuota cacheQuota = generateCacheQuota(split); for (HiveSelectivePageSourceFactory pageSourceFactory : selectivePageSourceFactories) { Optional pageSource = pageSourceFactory.createPageSource( configuration, @@ -318,15 +379,7 @@ private static Optional createSelectivePageSource( handle -> new Subfield(((HiveColumnHandle) handle).getName())).intersect(layout.getDomainPredicate())).orElse(layout.getDomainPredicate()), optimizedRemainingPredicate, hiveStorageTimeZone, - new HiveFileContext( - splitContext.isCacheable(), - cacheQuota, - split.getFileSplit().getExtraFileInfo().map(BinaryExtraHiveFileInfo::new), - OptionalLong.of(split.getFileSplit().getFileSize()), - OptionalLong.of(split.getFileSplit().getStart()), - OptionalLong.of(split.getFileSplit().getLength()), - split.getFileSplit().getFileModifiedTime(), - HiveSessionProperties.isVerboseRuntimeStatsEnabled(session)), + fileContext, encryptionInformation, layout.isAppendRowNumberEnabled(), layout.isFooterStatsUnreliable()); @@ -516,12 +569,6 @@ private static Optional getPageSourceFromCursorProvider( List regularAndInterimColumnMappings, Optional bucketAdaptation) { - if (!hiveColumns.isEmpty() && hiveColumns.stream().allMatch(hiveColumnHandle -> hiveColumnHandle.getColumnType() == AGGREGATED)) { - throw new UnsupportedOperationException("Partial aggregation pushdown only supported for ORC/Parquet files. " + - "Table " + tableName.toString() + " has file (" + fileSplit.getPath() + ") of format " + storage.getStorageFormat().getOutputFormat() + - ". Set session property hive.pushdown_partial_aggregations_into_scan=false and execute query again"); - } - for (HiveRecordCursorProvider provider : cursorProviders) { // GenericHiveRecordCursor will automatically do the coercion without HiveCoercionRecordCursor boolean doCoercion = !(provider instanceof GenericHiveRecordCursorProvider); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfAggregatedPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfAggregatedPageSourceFactory.java new file mode 100644 index 0000000000000..0877cc2229725 --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfAggregatedPageSourceFactory.java @@ -0,0 +1,132 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive.orc; + +import com.facebook.hive.orc.OrcSerde; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.hive.EncryptionInformation; +import com.facebook.presto.hive.FileFormatDataSourceStats; +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveAggregatedPageSourceFactory; +import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.HiveColumnHandle; +import com.facebook.presto.hive.HiveFileContext; +import com.facebook.presto.hive.HiveFileSplit; +import com.facebook.presto.hive.metastore.Storage; +import com.facebook.presto.orc.StripeMetadataSourceFactory; +import com.facebook.presto.orc.cache.OrcFileTailSource; +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.function.StandardFunctionResolution; +import org.apache.hadoop.conf.Configuration; + +import javax.inject.Inject; + +import java.util.List; +import java.util.Optional; + +import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA; +import static com.facebook.presto.hive.orc.OrcAggregatedPageSourceFactory.createOrcPageSource; +import static com.facebook.presto.orc.DwrfEncryptionProvider.NO_ENCRYPTION; +import static com.facebook.presto.orc.OrcEncoding.DWRF; +import static java.util.Objects.requireNonNull; + +public class DwrfAggregatedPageSourceFactory + implements HiveAggregatedPageSourceFactory +{ + private final TypeManager typeManager; + private final StandardFunctionResolution functionResolution; + private final boolean useOrcColumnNames; + private final HdfsEnvironment hdfsEnvironment; + private final FileFormatDataSourceStats stats; + private final OrcFileTailSource orcFileTailSource; + private final StripeMetadataSourceFactory stripeMetadataSourceFactory; + + @Inject + public DwrfAggregatedPageSourceFactory( + TypeManager typeManager, + StandardFunctionResolution functionResolution, + HiveClientConfig config, + HdfsEnvironment hdfsEnvironment, + FileFormatDataSourceStats stats, + OrcFileTailSource orcFileTailSource, + StripeMetadataSourceFactory stripeMetadataSourceFactory) + { + this( + typeManager, + functionResolution, + requireNonNull(config, "hiveClientConfig is null").isUseOrcColumnNames(), + hdfsEnvironment, + stats, + orcFileTailSource, + stripeMetadataSourceFactory); + } + + public DwrfAggregatedPageSourceFactory( + TypeManager typeManager, + StandardFunctionResolution functionResolution, + boolean useOrcColumnNames, + HdfsEnvironment hdfsEnvironment, + FileFormatDataSourceStats stats, + OrcFileTailSource orcFileTailSource, + StripeMetadataSourceFactory stripeMetadataSourceFactory) + { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); + this.useOrcColumnNames = useOrcColumnNames; + this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.stats = requireNonNull(stats, "stats is null"); + this.orcFileTailSource = requireNonNull(orcFileTailSource, "orcFileTailCache is null"); + this.stripeMetadataSourceFactory = requireNonNull(stripeMetadataSourceFactory, "stripeMetadataSourceFactory is null"); + } + + @Override + public Optional createPageSource( + Configuration configuration, + ConnectorSession session, + HiveFileSplit fileSplit, + Storage storage, + List columns, + HiveFileContext hiveFileContext, + Optional encryptionInformation, + boolean appendRowNumberEnabled) + { + if (!OrcSerde.class.getName().equals(storage.getStorageFormat().getSerDe())) { + return Optional.empty(); + } + + if (fileSplit.getFileSize() == 0) { + throw new PrestoException(HIVE_BAD_DATA, "ORC file is empty: " + fileSplit.getPath()); + } + + return Optional.of(createOrcPageSource( + session, + DWRF, + hdfsEnvironment, + configuration, + fileSplit, + columns, + useOrcColumnNames, + typeManager, + functionResolution, + stats, + orcFileTailSource, + stripeMetadataSourceFactory, + hiveFileContext, + encryptionInformation, + NO_ENCRYPTION, + appendRowNumberEnabled)); + } +} diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcAggregatedPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcAggregatedPageSourceFactory.java new file mode 100644 index 0000000000000..f0a0d3df12944 --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcAggregatedPageSourceFactory.java @@ -0,0 +1,209 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive.orc; + +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.hive.EncryptionInformation; +import com.facebook.presto.hive.FileFormatDataSourceStats; +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveAggregatedPageSourceFactory; +import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.HiveColumnHandle; +import com.facebook.presto.hive.HiveFileContext; +import com.facebook.presto.hive.HiveFileSplit; +import com.facebook.presto.hive.metastore.Storage; +import com.facebook.presto.orc.DwrfEncryptionProvider; +import com.facebook.presto.orc.OrcDataSource; +import com.facebook.presto.orc.OrcEncoding; +import com.facebook.presto.orc.OrcReader; +import com.facebook.presto.orc.OrcReaderOptions; +import com.facebook.presto.orc.StripeMetadataSourceFactory; +import com.facebook.presto.orc.cache.OrcFileTailSource; +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.FixedPageSource; +import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.google.common.collect.ImmutableList; +import io.airlift.units.DataSize; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; + +import javax.inject.Inject; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxMergeDistance; +import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxReadBlockSize; +import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcTinyStripeThreshold; +import static com.facebook.presto.hive.HiveCommonSessionProperties.isOrcZstdJniDecompressionEnabled; +import static com.facebook.presto.hive.HiveUtil.getPhysicalHiveColumnHandles; +import static com.facebook.presto.hive.orc.OrcPageSourceFactoryUtils.getOrcDataSource; +import static com.facebook.presto.hive.orc.OrcPageSourceFactoryUtils.getOrcReader; +import static com.facebook.presto.hive.orc.OrcPageSourceFactoryUtils.mapToPrestoException; +import static com.facebook.presto.orc.DwrfEncryptionProvider.NO_ENCRYPTION; +import static com.facebook.presto.orc.OrcEncoding.ORC; +import static java.util.Objects.requireNonNull; + +public class OrcAggregatedPageSourceFactory + implements HiveAggregatedPageSourceFactory +{ + private final TypeManager typeManager; + private final StandardFunctionResolution functionResolution; + private final boolean useOrcColumnNames; + private final HdfsEnvironment hdfsEnvironment; + private final FileFormatDataSourceStats stats; + private final OrcFileTailSource orcFileTailSource; + private final StripeMetadataSourceFactory stripeMetadataSourceFactory; + + @Inject + public OrcAggregatedPageSourceFactory( + TypeManager typeManager, + StandardFunctionResolution functionResolution, + HiveClientConfig config, + HdfsEnvironment hdfsEnvironment, + FileFormatDataSourceStats stats, + OrcFileTailSource orcFileTailSource, + StripeMetadataSourceFactory stripeMetadataSourceFactory) + { + this( + typeManager, + functionResolution, + requireNonNull(config, "hiveClientConfig is null").isUseOrcColumnNames(), + hdfsEnvironment, + stats, + orcFileTailSource, + stripeMetadataSourceFactory); + } + + public OrcAggregatedPageSourceFactory( + TypeManager typeManager, + StandardFunctionResolution functionResolution, + boolean useOrcColumnNames, + HdfsEnvironment hdfsEnvironment, + FileFormatDataSourceStats stats, + OrcFileTailSource orcFileTailSource, + StripeMetadataSourceFactory stripeMetadataSourceFactory) + { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); + this.useOrcColumnNames = useOrcColumnNames; + this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.stats = requireNonNull(stats, "stats is null"); + this.orcFileTailSource = requireNonNull(orcFileTailSource, "orcFileTailCache is null"); + this.stripeMetadataSourceFactory = requireNonNull(stripeMetadataSourceFactory, "stripeMetadataSourceFactory is null"); + } + + @Override + public Optional createPageSource( + Configuration configuration, + ConnectorSession session, + HiveFileSplit fileSplit, + Storage storage, + List columns, + HiveFileContext hiveFileContext, + Optional encryptionInformation, + boolean appendRowNumberEnabled) + { + if (!OrcSerde.class.getName().equals(storage.getStorageFormat().getSerDe())) { + return Optional.empty(); + } + + // per HIVE-13040 and ORC-162, empty files are allowed + if (fileSplit.getFileSize() == 0) { + return Optional.of(new FixedPageSource(ImmutableList.of())); + } + + return Optional.of(createOrcPageSource( + session, + ORC, + hdfsEnvironment, + configuration, + fileSplit, + columns, + useOrcColumnNames, + typeManager, + functionResolution, + stats, + orcFileTailSource, + stripeMetadataSourceFactory, + hiveFileContext, + encryptionInformation, + NO_ENCRYPTION, + appendRowNumberEnabled)); + } + + public static ConnectorPageSource createOrcPageSource( + ConnectorSession session, + OrcEncoding orcEncoding, + HdfsEnvironment hdfsEnvironment, + Configuration configuration, + HiveFileSplit fileSplit, + List columns, + boolean useOrcColumnNames, + TypeManager typeManager, + StandardFunctionResolution functionResolution, + FileFormatDataSourceStats stats, + OrcFileTailSource orcFileTailSource, + StripeMetadataSourceFactory stripeMetadataSourceFactory, + HiveFileContext hiveFileContext, + Optional encryptionInformation, + DwrfEncryptionProvider dwrfEncryptionProvider, + boolean appendRowNumberEnabled) + { + OrcDataSource orcDataSource = getOrcDataSource(session, fileSplit, hdfsEnvironment, configuration, hiveFileContext, stats); + + DataSize maxMergeDistance = getOrcMaxMergeDistance(session); + DataSize tinyStripeThreshold = getOrcTinyStripeThreshold(session); + DataSize maxReadBlockSize = getOrcMaxReadBlockSize(session); + + Path path = new Path(fileSplit.getPath()); + + OrcReaderOptions orcReaderOptions = OrcReaderOptions.builder() + .withMaxMergeDistance(maxMergeDistance) + .withTinyStripeThreshold(tinyStripeThreshold) + .withMaxBlockSize(maxReadBlockSize) + .withZstdJniDecompressionEnabled(isOrcZstdJniDecompressionEnabled(session)) + .withAppendRowNumber(appendRowNumberEnabled) + .build(); + try { + OrcReader reader = getOrcReader( + orcEncoding, + columns, + useOrcColumnNames, + orcFileTailSource, + stripeMetadataSourceFactory, + hiveFileContext, + orcReaderOptions, + encryptionInformation, + dwrfEncryptionProvider, + orcDataSource, + path); + + List physicalColumns = getPhysicalHiveColumnHandles(columns, useOrcColumnNames, reader.getTypes(), path); + + return new AggregatedOrcPageSource(physicalColumns, reader.getFooter(), typeManager, functionResolution); + } + catch (Exception e) { + try { + orcDataSource.close(); + } + catch (IOException ignored) { + } + throw mapToPrestoException(e, path, fileSplit); + } + } +} diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetAggregatedPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetAggregatedPageSourceFactory.java new file mode 100644 index 0000000000000..8c9bbc9138e30 --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetAggregatedPageSourceFactory.java @@ -0,0 +1,182 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive.parquet; + +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.hive.EncryptionInformation; +import com.facebook.presto.hive.FileFormatDataSourceStats; +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveAggregatedPageSourceFactory; +import com.facebook.presto.hive.HiveColumnHandle; +import com.facebook.presto.hive.HiveFileContext; +import com.facebook.presto.hive.HiveFileSplit; +import com.facebook.presto.hive.metastore.Storage; +import com.facebook.presto.parquet.ParquetCorruptionException; +import com.facebook.presto.parquet.ParquetDataSource; +import com.facebook.presto.parquet.cache.ParquetMetadataSource; +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.AccessControlException; +import org.apache.parquet.crypto.HiddenColumnException; +import org.apache.parquet.crypto.InternalFileDecryptor; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; + +import javax.inject.Inject; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static com.facebook.presto.hive.HiveCommonSessionProperties.getReadNullMaskedParquetEncryptedValue; +import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA; +import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; +import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA; +import static com.facebook.presto.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource; +import static com.facebook.presto.hive.parquet.ParquetPageSourceFactory.createDecryptor; +import static com.facebook.presto.spi.StandardErrorCode.PERMISSION_DENIED; +import static com.google.common.base.Strings.nullToEmpty; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class ParquetAggregatedPageSourceFactory + implements HiveAggregatedPageSourceFactory +{ + private static final Set PARQUET_SERDE_CLASS_NAMES = ImmutableSet.builder() + .add("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") + .add("parquet.hive.serde.ParquetHiveSerDe") + .build(); + + private final TypeManager typeManager; + private final StandardFunctionResolution functionResolution; + private final HdfsEnvironment hdfsEnvironment; + private final FileFormatDataSourceStats stats; + private final ParquetMetadataSource parquetMetadataSource; + + @Inject + public ParquetAggregatedPageSourceFactory(TypeManager typeManager, + StandardFunctionResolution functionResolution, + HdfsEnvironment hdfsEnvironment, + FileFormatDataSourceStats stats, + ParquetMetadataSource parquetMetadataSource) + { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); + this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.stats = requireNonNull(stats, "stats is null"); + this.parquetMetadataSource = requireNonNull(parquetMetadataSource, "parquetMetadataSource is null"); + } + + @Override + public Optional createPageSource( + Configuration configuration, + ConnectorSession session, + HiveFileSplit fileSplit, + Storage storage, + List columns, + HiveFileContext hiveFileContext, + Optional encryptionInformation, + boolean appendRowNumberEnabled) + { + if (!PARQUET_SERDE_CLASS_NAMES.contains(storage.getStorageFormat().getSerDe())) { + return Optional.empty(); + } + + return Optional.of(createParquetPageSource( + hdfsEnvironment, + session, + configuration, + fileSplit, + columns, + typeManager, + functionResolution, + stats, + hiveFileContext, + parquetMetadataSource)); + } + + public static ConnectorPageSource createParquetPageSource( + HdfsEnvironment hdfsEnvironment, + ConnectorSession session, + Configuration configuration, + HiveFileSplit fileSplit, + List columns, + TypeManager typeManager, + StandardFunctionResolution functionResolution, + FileFormatDataSourceStats stats, + HiveFileContext hiveFileContext, + ParquetMetadataSource parquetMetadataSource) + { + String user = session.getUser(); + boolean readMaskedValue = getReadNullMaskedParquetEncryptedValue(session); + + ParquetDataSource dataSource = null; + Path path = new Path(fileSplit.getPath()); + try { + FSDataInputStream inputStream = hdfsEnvironment.getFileSystem(user, path, configuration).openFile(path, hiveFileContext); + // Lambda expression below requires final variable, so we define a new variable parquetDataSource. + final ParquetDataSource parquetDataSource = buildHdfsParquetDataSource(inputStream, path, stats); + dataSource = parquetDataSource; + Optional fileDecryptor = createDecryptor(configuration, path); + ParquetMetadata parquetMetadata = hdfsEnvironment.doAs(user, () -> parquetMetadataSource.getParquetMetadata( + parquetDataSource, + fileSplit.getFileSize(), + hiveFileContext.isCacheable(), + hiveFileContext.getModificationTime(), + fileDecryptor, + readMaskedValue).getParquetMetadata()); + + return new AggregatedParquetPageSource(columns, parquetMetadata, typeManager, functionResolution); + } + catch (Exception e) { + try { + if (dataSource != null) { + dataSource.close(); + } + } + catch (IOException ignored) { + } + if (e instanceof PrestoException) { + throw (PrestoException) e; + } + if (e instanceof ParquetCorruptionException) { + throw new PrestoException(HIVE_BAD_DATA, e); + } + if (e instanceof AccessControlException) { + throw new PrestoException(PERMISSION_DENIED, e.getMessage(), e); + } + if (nullToEmpty(e.getMessage()).trim().equals("Filesystem closed") || + e instanceof FileNotFoundException) { + throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, e); + } + String message = format("Error opening Hive split %s (offset=%s, length=%s): %s", path, fileSplit.getStart(), fileSplit.getLength(), e.getMessage()); + if (e.getClass().getSimpleName().equals("BlockMissingException")) { + throw new PrestoException(HIVE_MISSING_DATA, message, e); + } + if (e instanceof HiddenColumnException) { + message = format("User does not have access to encryption key for encrypted column = %s. If returning 'null' for encrypted " + + "columns is acceptable to your query, please add 'set session hive.read_null_masked_parquet_encrypted_value_enabled=true' before your query", ((HiddenColumnException) e).getColumn()); + throw new PrestoException(PERMISSION_DENIED, message, e); + } + throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, message, e); + } + } +} diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java index f6f75b618a950..008e1dd705f0b 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java @@ -259,6 +259,7 @@ import static com.facebook.presto.hive.HiveTestUtils.SESSION; import static com.facebook.presto.hive.HiveTestUtils.arrayType; import static com.facebook.presto.hive.HiveTestUtils.getAllSessionProperties; +import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveAggregatedPageSourceFactories; import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveBatchPageSourceFactories; import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveFileWriterFactories; import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveRecordCursorProvider; @@ -1079,7 +1080,15 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi new HiveWriterStats(), getDefaultOrcFileWriterFactory(hiveClientConfig, metastoreClientConfig), DEFAULT_COLUMN_CONVERTER_PROVIDER); - pageSourceProvider = new HivePageSourceProvider(hiveClientConfig, hdfsEnvironment, getDefaultHiveRecordCursorProvider(hiveClientConfig, metastoreClientConfig), getDefaultHiveBatchPageSourceFactories(hiveClientConfig, metastoreClientConfig), getDefaultHiveSelectivePageSourceFactories(hiveClientConfig, metastoreClientConfig), FUNCTION_AND_TYPE_MANAGER, ROW_EXPRESSION_SERVICE); + pageSourceProvider = new HivePageSourceProvider( + hiveClientConfig, + hdfsEnvironment, + getDefaultHiveRecordCursorProvider(hiveClientConfig, metastoreClientConfig), + getDefaultHiveBatchPageSourceFactories(hiveClientConfig, metastoreClientConfig), + getDefaultHiveSelectivePageSourceFactories(hiveClientConfig, metastoreClientConfig), + getDefaultHiveAggregatedPageSourceFactories(hiveClientConfig, metastoreClientConfig), + FUNCTION_AND_TYPE_MANAGER, + ROW_EXPRESSION_SERVICE); } /** diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java index 395c0e03f5d0c..204d581d389fc 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java @@ -105,6 +105,7 @@ import static com.facebook.presto.hive.HiveTestUtils.PAGE_SORTER; import static com.facebook.presto.hive.HiveTestUtils.ROW_EXPRESSION_SERVICE; import static com.facebook.presto.hive.HiveTestUtils.getAllSessionProperties; +import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveAggregatedPageSourceFactories; import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveBatchPageSourceFactories; import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveFileWriterFactories; import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveRecordCursorProvider; @@ -262,14 +263,15 @@ protected void setup(String host, int port, String databaseName, BiFunction recordCursorProviderSet = s3SelectPushdownEnabled ? - getDefaultS3HiveRecordCursorProvider(config, metastoreClientConfig) : - getDefaultHiveRecordCursorProvider(config, metastoreClientConfig); + getDefaultS3HiveRecordCursorProvider(config, metastoreClientConfig) : + getDefaultHiveRecordCursorProvider(config, metastoreClientConfig); pageSourceProvider = new HivePageSourceProvider( config, hdfsEnvironment, recordCursorProviderSet, getDefaultHiveBatchPageSourceFactories(config, metastoreClientConfig), getDefaultHiveSelectivePageSourceFactories(config, metastoreClientConfig), + getDefaultHiveAggregatedPageSourceFactories(config, metastoreClientConfig), FUNCTION_AND_TYPE_MANAGER, ROW_EXPRESSION_SERVICE); } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/HiveTestUtils.java b/presto-hive/src/test/java/com/facebook/presto/hive/HiveTestUtils.java index 83ff58459f2bd..bc1346460a632 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/HiveTestUtils.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/HiveTestUtils.java @@ -33,13 +33,16 @@ import com.facebook.presto.hive.datasink.OutputStreamDataSinkFactory; import com.facebook.presto.hive.gcs.HiveGcsConfig; import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer; +import com.facebook.presto.hive.orc.DwrfAggregatedPageSourceFactory; import com.facebook.presto.hive.orc.DwrfBatchPageSourceFactory; import com.facebook.presto.hive.orc.DwrfSelectivePageSourceFactory; +import com.facebook.presto.hive.orc.OrcAggregatedPageSourceFactory; import com.facebook.presto.hive.orc.OrcBatchPageSourceFactory; import com.facebook.presto.hive.orc.OrcSelectivePageSourceFactory; import com.facebook.presto.hive.orc.TupleDomainFilterCache; import com.facebook.presto.hive.pagefile.PageFilePageSourceFactory; import com.facebook.presto.hive.pagefile.PageFileWriterFactory; +import com.facebook.presto.hive.parquet.ParquetAggregatedPageSourceFactory; import com.facebook.presto.hive.parquet.ParquetPageSourceFactory; import com.facebook.presto.hive.rcfile.RcFilePageSourceFactory; import com.facebook.presto.hive.s3.HiveS3Config; @@ -173,6 +176,17 @@ public static Set getDefaultHiveSelectivePageSou .build(); } + public static Set getDefaultHiveAggregatedPageSourceFactories(HiveClientConfig hiveClientConfig, MetastoreClientConfig metastoreClientConfig) + { + FileFormatDataSourceStats stats = new FileFormatDataSourceStats(); + HdfsEnvironment testHdfsEnvironment = createTestHdfsEnvironment(hiveClientConfig, metastoreClientConfig); + return ImmutableSet.builder() + .add(new OrcAggregatedPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, hiveClientConfig, testHdfsEnvironment, stats, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource()))) + .add(new DwrfAggregatedPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, hiveClientConfig, testHdfsEnvironment, stats, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource()))) + .add(new ParquetAggregatedPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, testHdfsEnvironment, stats, new MetadataReader())) + .build(); + } + public static Set getDefaultHiveRecordCursorProvider(HiveClientConfig hiveClientConfig, MetastoreClientConfig metastoreClientConfig) { HdfsEnvironment testHdfsEnvironment = createTestHdfsEnvironment(hiveClientConfig, metastoreClientConfig); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestDynamicPruning.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestDynamicPruning.java index 1c92f8a2d5a67..1310fec39998a 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestDynamicPruning.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestDynamicPruning.java @@ -55,6 +55,7 @@ import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_AND_TYPE_MANAGER; import static com.facebook.presto.hive.HiveTestUtils.ROW_EXPRESSION_SERVICE; import static com.facebook.presto.hive.HiveTestUtils.createTestHdfsEnvironment; +import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveAggregatedPageSourceFactories; import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveBatchPageSourceFactories; import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveRecordCursorProvider; import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveSelectivePageSourceFactories; @@ -187,7 +188,14 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle hiveTableHandle, transaction, Optional.of(tableLayoutHandle)); - HivePageSourceProvider provider = new HivePageSourceProvider(config, createTestHdfsEnvironment(config, metastoreClientConfig), getDefaultHiveRecordCursorProvider(config, metastoreClientConfig), getDefaultHiveBatchPageSourceFactories(config, metastoreClientConfig), getDefaultHiveSelectivePageSourceFactories(config, metastoreClientConfig), FUNCTION_AND_TYPE_MANAGER, ROW_EXPRESSION_SERVICE); + HivePageSourceProvider provider = new HivePageSourceProvider( + config, createTestHdfsEnvironment(config, metastoreClientConfig), + getDefaultHiveRecordCursorProvider(config, metastoreClientConfig), + getDefaultHiveBatchPageSourceFactories(config, metastoreClientConfig), + getDefaultHiveSelectivePageSourceFactories(config, metastoreClientConfig), + getDefaultHiveAggregatedPageSourceFactories(config, metastoreClientConfig), + FUNCTION_AND_TYPE_MANAGER, + ROW_EXPRESSION_SERVICE); return provider.createPageSource(transaction, getSession(config), split, tableHandle.getLayout().get(), ImmutableList.copyOf(getColumnHandles()), splitContext); } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java index 9ff28729e6259..b59aaa166fc92 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java @@ -78,6 +78,7 @@ import static com.facebook.presto.hive.HiveTestUtils.ROW_EXPRESSION_SERVICE; import static com.facebook.presto.hive.HiveTestUtils.createTestHdfsEnvironment; import static com.facebook.presto.hive.HiveTestUtils.getAllSessionProperties; +import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveAggregatedPageSourceFactories; import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveBatchPageSourceFactories; import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveFileWriterFactories; import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveRecordCursorProvider; @@ -294,7 +295,15 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa new HiveTableHandle(SCHEMA_NAME, TABLE_NAME), transaction, Optional.of(layoutHandle)); - HivePageSourceProvider provider = new HivePageSourceProvider(config, createTestHdfsEnvironment(config, metastoreClientConfig), getDefaultHiveRecordCursorProvider(config, metastoreClientConfig), getDefaultHiveBatchPageSourceFactories(config, metastoreClientConfig), getDefaultHiveSelectivePageSourceFactories(config, metastoreClientConfig), FUNCTION_AND_TYPE_MANAGER, ROW_EXPRESSION_SERVICE); + HivePageSourceProvider provider = new HivePageSourceProvider( + config, + createTestHdfsEnvironment(config, metastoreClientConfig), + getDefaultHiveRecordCursorProvider(config, metastoreClientConfig), + getDefaultHiveBatchPageSourceFactories(config, metastoreClientConfig), + getDefaultHiveSelectivePageSourceFactories(config, metastoreClientConfig), + getDefaultHiveAggregatedPageSourceFactories(config, metastoreClientConfig), + FUNCTION_AND_TYPE_MANAGER, + ROW_EXPRESSION_SERVICE); return provider.createPageSource(transaction, getSession(config, new HiveCommonClientConfig()), split, tableHandle.getLayout().get(), ImmutableList.copyOf(getColumnHandles()), NON_CACHEABLE); } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSourceProvider.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSourceProvider.java index 1d09f2338f03f..a5160be8e7bb8 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSourceProvider.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSourceProvider.java @@ -15,18 +15,26 @@ import com.facebook.presto.cache.CacheConfig; import com.facebook.presto.common.Page; +import com.facebook.presto.common.Subfield; import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.type.TestingTypeManager; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.hive.metastore.Storage; import com.facebook.presto.hive.metastore.StorageFormat; +import com.facebook.presto.metadata.FunctionAndTypeManager; +import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.RecordCursor; import com.facebook.presto.spi.RecordPageSource; import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.SplitContext; import com.facebook.presto.spi.SplitWeight; +import com.facebook.presto.spi.relation.RowExpression; +import com.facebook.presto.sql.planner.optimizations.AggregationNodeUtils; import com.facebook.presto.testing.TestingConnectorSession; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -34,7 +42,9 @@ import io.airlift.slice.Slice; import io.airlift.units.DataSize; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit; import org.joda.time.DateTimeZone; @@ -48,15 +58,31 @@ import java.util.OptionalInt; import java.util.Properties; +import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; +import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.AGGREGATED; +import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; import static com.facebook.presto.hive.CacheQuotaRequirement.NO_CACHE_REQUIREMENT; import static com.facebook.presto.hive.CacheQuotaScope.PARTITION; +import static com.facebook.presto.hive.HiveStorageFormat.ORC; +import static com.facebook.presto.hive.HiveStorageFormat.RCBINARY; +import static com.facebook.presto.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static com.facebook.presto.hive.HiveTestUtils.HIVE_CLIENT_CONFIG; +import static com.facebook.presto.hive.HiveTestUtils.METADATA; +import static com.facebook.presto.hive.HiveTestUtils.METASTORE_CLIENT_CONFIG; +import static com.facebook.presto.hive.HiveTestUtils.ROW_EXPRESSION_SERVICE; +import static com.facebook.presto.hive.HiveTestUtils.SESSION; +import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveRecordCursorProvider; +import static com.facebook.presto.hive.HiveType.HIVE_LONG; import static com.facebook.presto.hive.HiveUtil.CUSTOM_FILE_SPLIT_CLASS_KEY; +import static com.facebook.presto.hive.TestHiveMetadataUpdateHandle.TEST_TABLE_NAME; import static com.facebook.presto.hive.TestHivePageSink.getColumnHandles; +import static com.facebook.presto.hive.metastore.thrift.MockHiveMetastoreClient.TEST_DATABASE; import static com.facebook.presto.hive.util.HudiRealtimeSplitConverter.HUDI_BASEPATH_KEY; import static com.facebook.presto.hive.util.HudiRealtimeSplitConverter.HUDI_DELTA_FILEPATHS_KEY; import static com.facebook.presto.hive.util.HudiRealtimeSplitConverter.HUDI_MAX_COMMIT_TIME_KEY; import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE; import static com.facebook.presto.testing.assertions.Assert.assertEquals; +import static java.lang.String.format; import static org.testng.Assert.assertTrue; public class TestHivePageSourceProvider @@ -65,6 +91,39 @@ public class TestHivePageSourceProvider private static final String TABLE_NAME = "table"; private static final String PARTITION_NAME = "partition"; + private static final ColumnHandle LONG_COLUMN = new HiveColumnHandle( + "test_column", + HIVE_LONG, + HIVE_LONG.getTypeSignature(), + 5, + REGULAR, + Optional.empty(), + ImmutableList.of(), + Optional.empty()); + + private static final ColumnHandle LONG_AGGREGATED_COLUMN = new HiveColumnHandle( + "test_column", + HIVE_LONG, + HIVE_LONG.getTypeSignature(), + 5, + AGGREGATED, + Optional.empty(), + ImmutableList.of(), + Optional.of(AggregationNodeUtils.count(FunctionAndTypeManager.createTestFunctionAndTypeManager()))); + + public HivePageSourceProvider createPageSourceProvider() + { + return new HivePageSourceProvider( + HIVE_CLIENT_CONFIG, + HDFS_ENVIRONMENT, + getDefaultHiveRecordCursorProvider(HIVE_CLIENT_CONFIG, METASTORE_CLIENT_CONFIG), + ImmutableSet.of(new MockOrcBatchPageSourceFactory(), new MockRcBinaryBatchPageSourceFactory()), + ImmutableSet.of(new MockOrcSelectivePageSourceFactory()), + ImmutableSet.of(new MockOrcAggregatedPageSourceFactory()), + METADATA.getFunctionAndTypeManager(), + ROW_EXPRESSION_SERVICE); + } + @Test public void testGenerateCacheQuota() { @@ -247,6 +306,174 @@ public void testNotUseRecordReaderWithInputFormatAnnotationWithoutCustomSplit() assertTrue(pageSource.get() instanceof HivePageSource); } + @Test + public void testUsesPageSourceForPartition() + { + HivePageSourceProvider pageSourceProvider = createPageSourceProvider(); + ConnectorPageSource pageSource = pageSourceProvider.createPageSource( + new HiveTransactionHandle(), + SESSION, + getHiveSplit(ORC), + getHiveTableLayout(false, false, false), + ImmutableList.of(LONG_COLUMN), + new SplitContext(false)); + assertTrue(pageSource instanceof HivePageSource, format("pageSource was %s", pageSource.getClass().getSimpleName())); + assertTrue(((HivePageSource) pageSource).getPageSource() instanceof MockOrcBatchPageSource, + format("pageSoruce was %s", ((HivePageSource) pageSource).getPageSource().getClass().getSimpleName())); + + pageSource = pageSourceProvider.createPageSource( + new HiveTransactionHandle(), + SESSION, + getHiveSplit(RCBINARY), + getHiveTableLayout(false, false, false), + ImmutableList.of(LONG_COLUMN), + new SplitContext(false)); + assertTrue(pageSource instanceof HivePageSource, format("pageSource was %s", pageSource.getClass().getSimpleName())); + assertTrue(((HivePageSource) pageSource).getPageSource() instanceof MockRcBinaryBatchPageSource, + format("pageSource was %s", ((HivePageSource) pageSource).getPageSource().getClass().getSimpleName())); + + pageSource = pageSourceProvider.createPageSource( + new HiveTransactionHandle(), + SESSION, + getHiveSplit(ORC), + getHiveTableLayout(true, false, false), + ImmutableList.of(LONG_COLUMN), + new SplitContext(false)); + assertTrue(pageSource instanceof MockOrcSelectivePageSource, format("pageSource was %s", pageSource.getClass().getSimpleName())); + } + + @Test + public void testWrapsInFilteringPageSourceWhenNoSelectivePageSource() + { + HivePageSourceProvider pageSourceProvider = createPageSourceProvider(); + ConnectorPageSource pageSource = pageSourceProvider.createPageSource( + new HiveTransactionHandle(), + SESSION, + getHiveSplit(RCBINARY), + getHiveTableLayout(true, false, false), + ImmutableList.of(), + new SplitContext(false)); + assertTrue(pageSource instanceof FilteringPageSource, format("pageSource was %s", pageSource.getClass().getSimpleName())); + } + + @Test + public void testAggregatedPageSource() + { + HivePageSourceProvider pageSourceProvider = createPageSourceProvider(); + ConnectorPageSource pageSource = pageSourceProvider.createPageSource( + new HiveTransactionHandle(), + SESSION, + getHiveSplit(ORC), + getHiveTableLayout(true, true, false), + ImmutableList.of(LONG_AGGREGATED_COLUMN), + new SplitContext(false)); + assertTrue(pageSource instanceof MockOrcAggregatedPageSource, format("pageSource %s", pageSource.getClass().getSimpleName())); + } + + @Test(expectedExceptions = PrestoException.class, + expectedExceptionsMessageRegExp = "Table testdb.table has file of format org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe that does not support partial aggregation pushdown. " + + "Set session property \\[catalog\\-name\\].pushdown_partial_aggregations_into_scan=false and execute query again.") + public void testFailsWhenNoAggregatedPageSourceAvailable() + { + HivePageSourceProvider pageSourceProvider = createPageSourceProvider(); + ConnectorPageSource pageSource = pageSourceProvider.createPageSource( + new HiveTransactionHandle(), + SESSION, + getHiveSplit(RCBINARY), + getHiveTableLayout(false, true, false), + ImmutableList.of(LONG_AGGREGATED_COLUMN), + new SplitContext(false)); + } + + @Test(expectedExceptions = PrestoException.class, + expectedExceptionsMessageRegExp = "Partial aggregation pushdown is not supported when footer stats are unreliable. " + + "Table testdb.table has file file://test with unreliable footer stats. " + + "Set session property \\[catalog\\-name\\].pushdown_partial_aggregations_into_scan=false and execute query again.") + public void testFailsWhenFooterStatsUnreliable() + { + HivePageSourceProvider pageSourceProvider = createPageSourceProvider(); + pageSourceProvider.createPageSource( + new HiveTransactionHandle(), + SESSION, + getHiveSplit(ORC), + getHiveTableLayout(false, true, true), + ImmutableList.of(LONG_AGGREGATED_COLUMN), + new SplitContext(false)); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Not all columns are of 'AGGREGATED' type") + public void testFailsWhenMixOfAggregatedAndRegularColumns() + { + HivePageSourceProvider pageSourceProvider = createPageSourceProvider(); + pageSourceProvider.createPageSource( + new HiveTransactionHandle(), + SESSION, + getHiveSplit(ORC), + getHiveTableLayout(false, true, false), + ImmutableList.of(LONG_COLUMN, LONG_AGGREGATED_COLUMN), + new SplitContext(false)); + } + + private static ConnectorTableLayoutHandle getHiveTableLayout(boolean pushdownFilterEnabled, boolean partialAggregationsPushedDown, boolean footerStatsUnreliable) + { + return new HiveTableLayoutHandle( + new SchemaTableName(TEST_DATABASE, TEST_TABLE_NAME), + TEST_TABLE_NAME, + ImmutableList.of(), + ImmutableList.of(), // TODO fill out columns + ImmutableMap.of(), + TupleDomain.all(), // none + TRUE_CONSTANT, + ImmutableMap.of(), + TupleDomain.all(), + Optional.empty(), + Optional.empty(), + pushdownFilterEnabled, + "layout", + Optional.empty(), + partialAggregationsPushedDown, + true, + footerStatsUnreliable); + } + + private static HiveSplit getHiveSplit(HiveStorageFormat hiveStorageFormat) + { + HiveFileSplit fileSplit = new HiveFileSplit("file://test", + 0, + 10, + 10, + Instant.now().toEpochMilli(), + Optional.empty(), + ImmutableMap.of()); + + return new HiveSplit( + fileSplit, + SCHEMA_NAME, + TABLE_NAME, + PARTITION_NAME, + + new Storage( + StorageFormat.create(hiveStorageFormat.getSerDe(), hiveStorageFormat.getInputFormat(), hiveStorageFormat.getOutputFormat()), + "location", + Optional.empty(), + false, + ImmutableMap.of(), + ImmutableMap.of()), + ImmutableList.of(), + ImmutableList.of(), + OptionalInt.empty(), + OptionalInt.empty(), + NO_PREFERENCE, + getColumnHandles().size(), + TableToPartitionMapping.empty(), + Optional.empty(), + false, + NO_CACHE_REQUIREMENT, + Optional.empty(), + ImmutableSet.of(), + SplitWeight.standard()); + } + static class MockHiveBatchPageSourceFactory implements HiveBatchPageSourceFactory { @@ -399,4 +626,68 @@ public void close() { } } + + private static class MockOrcBatchPageSourceFactory + implements HiveBatchPageSourceFactory + { + @Override + public Optional createPageSource(Configuration configuration, ConnectorSession session, HiveFileSplit fileSplit, Storage storage, SchemaTableName tableName, Map tableParameters, List columns, TupleDomain effectivePredicate, DateTimeZone hiveStorageTimeZone, HiveFileContext hiveFileContext, Optional encryptionInformation) + { + if (!OrcSerde.class.getName().equals(storage.getStorageFormat().getSerDe())) { + return Optional.empty(); + } + return Optional.of(new MockOrcBatchPageSource()); + } + } + + private static class MockOrcSelectivePageSourceFactory + implements HiveSelectivePageSourceFactory + { + @Override + public Optional createPageSource(Configuration configuration, ConnectorSession session, HiveFileSplit fileSplit, Storage storage, List columns, Map prefilledValues, Map coercers, Optional bucketAdaptation, List outputColumns, TupleDomain domainPredicate, RowExpression remainingPredicate, DateTimeZone hiveStorageTimeZone, HiveFileContext hiveFileContext, Optional encryptionInformation, boolean appendRowNumberEnabled, boolean footerStatsUnreliable) + { + if (!OrcSerde.class.getName().equals(storage.getStorageFormat().getSerDe())) { + return Optional.empty(); + } + return Optional.of(new MockOrcSelectivePageSource()); + } + } + + private static class MockOrcAggregatedPageSourceFactory + implements HiveAggregatedPageSourceFactory + { + @Override + public Optional createPageSource(Configuration configuration, ConnectorSession session, HiveFileSplit fileSplit, Storage storage, List columns, HiveFileContext hiveFileContext, Optional encryptionInformation, boolean appendRowNumberEnabled) + { + if (!OrcSerde.class.getName().equals(storage.getStorageFormat().getSerDe())) { + return Optional.empty(); + } + return Optional.of(new MockOrcAggregatedPageSource()); + } + } + + private static class MockRcBinaryBatchPageSourceFactory + implements HiveBatchPageSourceFactory + { + @Override + public Optional createPageSource(Configuration configuration, ConnectorSession session, HiveFileSplit fileSplit, Storage storage, SchemaTableName tableName, Map tableParameters, List columns, TupleDomain effectivePredicate, DateTimeZone hiveStorageTimeZone, HiveFileContext hiveFileContext, Optional encryptionInformation) + { + if (!storage.getStorageFormat().getSerDe().equals(LazyBinaryColumnarSerDe.class.getName())) { + return Optional.empty(); + } + return Optional.of(new MockRcBinaryBatchPageSource()); + } + } + + private static class MockOrcBatchPageSource + extends MockPageSource {} + + private static class MockOrcSelectivePageSource + extends MockPageSource {} + + private static class MockOrcAggregatedPageSource + extends MockPageSource {} + + private static class MockRcBinaryBatchPageSource + extends MockPageSource {} } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePushdownFilterQueries.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePushdownFilterQueries.java index 324322883fbda..ee0d5c11755bc 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePushdownFilterQueries.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePushdownFilterQueries.java @@ -1044,7 +1044,8 @@ private void assertFileFormat(HiveStorageFormat storageFormat) // no filter assertQueryUsingH2Cte("SELECT * FROM test_file_format_orc", cte); assertQueryUsingH2Cte("SELECT comment FROM test_file_format_orc", cte); - assertQueryFails("SELECT COUNT(*) FROM test_file_format_orc", "Partial aggregation pushdown only supported for ORC/Parquet files. Table tpch.test_file_format_orc has file ((.*?)) of format (.*?). Set session property hive.pushdown_partial_aggregations_into_scan=false and execute query again"); + assertQueryFails("SELECT COUNT(*) FROM test_file_format_orc", "Table tpch.test_file_format_orc has file of format .* that does not support partial aggregation pushdown. " + + "Set session property \\[catalog\\-name\\].pushdown_partial_aggregations_into_scan=false and execute query again."); assertQueryUsingH2Cte(noPartialAggregationPushdown(queryRunner.getDefaultSession()), "SELECT COUNT(*) FROM test_file_format_orc", cte, Function.identity()); // filter on partition column From 97e9bc1dc8d2b118795e4524f5a79ba17d14b17f Mon Sep 17 00:00:00 2001 From: Rebecca Schlussel Date: Mon, 4 Mar 2024 11:49:16 -0500 Subject: [PATCH 4/5] Remove unneeded error handling from page source factories Remove error handling for aggregated columns from individual page source factories, as these errors are now handled in a consolidated place. This commit is separate from the main commit that consolidated the error handling for easier review. --- .../presto/hive/HivePageSourceProvider.java | 3 +-- .../hive/HiveSelectivePageSourceFactory.java | 3 +-- .../hive/orc/DwrfSelectivePageSourceFactory.java | 6 ++---- .../presto/hive/orc/OrcBatchPageSourceFactory.java | 5 ----- .../hive/orc/OrcSelectivePageSourceFactory.java | 14 +++----------- .../hive/parquet/ParquetPageSourceFactory.java | 5 ----- .../parquet/ParquetSelectivePageSourceFactory.java | 3 +-- .../hive/rcfile/RcFilePageSourceFactory.java | 7 ------- .../presto/hive/TestHivePageSourceProvider.java | 2 +- 9 files changed, 9 insertions(+), 39 deletions(-) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java index 1613cdf572a4c..c950dd7de5320 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java @@ -381,8 +381,7 @@ private static Optional createSelectivePageSource( hiveStorageTimeZone, fileContext, encryptionInformation, - layout.isAppendRowNumberEnabled(), - layout.isFooterStatsUnreliable()); + layout.isAppendRowNumberEnabled()); if (pageSource.isPresent()) { return Optional.of(pageSource.get()); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSelectivePageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSelectivePageSourceFactory.java index d78ceb636ed34..63b22cb1043ce 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSelectivePageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSelectivePageSourceFactory.java @@ -43,6 +43,5 @@ Optional createPageSource( DateTimeZone hiveStorageTimeZone, HiveFileContext hiveFileContext, Optional encryptionInformation, - boolean appendRowNumberEnabled, - boolean footerStatsUnreliable); + boolean appendRowNumberEnabled); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfSelectivePageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfSelectivePageSourceFactory.java index 6a9e2527a8c73..c2037e41e2d5e 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfSelectivePageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/DwrfSelectivePageSourceFactory.java @@ -107,8 +107,7 @@ public Optional createPageSource( DateTimeZone hiveStorageTimeZone, HiveFileContext hiveFileContext, Optional encryptionInformation, - boolean appendRowNumberEnabled, - boolean footerStatsUnreliable) + boolean appendRowNumberEnabled) { if (!OrcSerde.class.getName().equals(storage.getStorageFormat().getSerDe())) { return Optional.empty(); @@ -145,7 +144,6 @@ public Optional createPageSource( tupleDomainFilterCache, encryptionInformation, dwrfEncryptionProvider, - appendRowNumberEnabled, - footerStatsUnreliable)); + appendRowNumberEnabled)); } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSourceFactory.java index 208869ec6b4e7..1f6781247453d 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSourceFactory.java @@ -57,7 +57,6 @@ import java.util.Map; import java.util.Optional; -import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.AGGREGATED; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxMergeDistance; import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxReadBlockSize; @@ -225,10 +224,6 @@ public static ConnectorPageSource createOrcPageSource( } } - if (!physicalColumns.isEmpty() && physicalColumns.stream().allMatch(hiveColumnHandle -> hiveColumnHandle.getColumnType() == AGGREGATED)) { - return new AggregatedOrcPageSource(physicalColumns, reader.getFooter(), typeManager, functionResolution); - } - OrcPredicate predicate = new TupleDomainOrcPredicate<>(effectivePredicate, columnReferences.build(), orcBloomFiltersEnabled, Optional.of(domainCompactionThreshold)); OrcBatchRecordReader recordReader = reader.createBatchRecordReader( diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSourceFactory.java index fce496f2fec64..d3993e8bbc82f 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSourceFactory.java @@ -98,7 +98,6 @@ import static com.facebook.presto.expressions.LogicalRowExpressions.binaryExpression; import static com.facebook.presto.expressions.LogicalRowExpressions.extractConjuncts; import static com.facebook.presto.expressions.RowExpressionNodeInliner.replaceExpression; -import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.AGGREGATED; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; import static com.facebook.presto.hive.HiveBucketing.getHiveBucket; import static com.facebook.presto.hive.HiveCommonSessionProperties.getOrcMaxMergeDistance; @@ -205,8 +204,7 @@ public Optional createPageSource( DateTimeZone hiveStorageTimeZone, HiveFileContext hiveFileContext, Optional encryptionInformation, - boolean appendRowNumberEnabled, - boolean footerStatsUnreliable) + boolean appendRowNumberEnabled) { if (!OrcSerde.class.getName().equals(storage.getStorageFormat().getSerDe())) { return Optional.empty(); @@ -244,8 +242,7 @@ public Optional createPageSource( tupleDomainFilterCache, encryptionInformation, NO_ENCRYPTION, - appendRowNumberEnabled, - footerStatsUnreliable)); + appendRowNumberEnabled)); } public static ConnectorPageSource createOrcPageSource( @@ -275,8 +272,7 @@ public static ConnectorPageSource createOrcPageSource( TupleDomainFilterCache tupleDomainFilterCache, Optional encryptionInformation, DwrfEncryptionProvider dwrfEncryptionProvider, - boolean appendRowNumberEnabled, - boolean footerStatsUnreliable) + boolean appendRowNumberEnabled) { checkArgument(domainCompactionThreshold >= 1, "domainCompactionThreshold must be at least 1"); @@ -312,10 +308,6 @@ public static ConnectorPageSource createOrcPageSource( List physicalColumns = getPhysicalHiveColumnHandles(columns, useOrcColumnNames, reader.getTypes(), path); - if (!footerStatsUnreliable && !physicalColumns.isEmpty() && physicalColumns.stream().allMatch(hiveColumnHandle -> hiveColumnHandle.getColumnType() == AGGREGATED)) { - return new AggregatedOrcPageSource(physicalColumns, reader.getFooter(), typeManager, functionResolution); - } - Map indexMapping = IntStream.range(0, columns.size()) .boxed() .collect(toImmutableMap(i -> columns.get(i).getHiveColumnIndex(), i -> physicalColumns.get(i).getHiveColumnIndex())); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java index 63e8410fad74c..656761451c76d 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java @@ -96,7 +96,6 @@ import static com.facebook.presto.common.type.StandardTypes.TINYINT; import static com.facebook.presto.common.type.StandardTypes.VARBINARY; import static com.facebook.presto.common.type.StandardTypes.VARCHAR; -import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.AGGREGATED; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.SYNTHESIZED; import static com.facebook.presto.hive.HiveColumnHandle.getPushedDownSubfield; @@ -212,10 +211,6 @@ public static ConnectorPageSource createParquetPageSource( fileDecryptor, readMaskedValue).getParquetMetadata()); - if (!columns.isEmpty() && columns.stream().allMatch(hiveColumnHandle -> hiveColumnHandle.getColumnType() == AGGREGATED)) { - return new AggregatedParquetPageSource(columns, parquetMetadata, typeManager, functionResolution); - } - FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); MessageType fileSchema = fileMetaData.getSchema(); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetSelectivePageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetSelectivePageSourceFactory.java index f3e0465a9b98a..0fc8282362438 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetSelectivePageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetSelectivePageSourceFactory.java @@ -70,8 +70,7 @@ public Optional createPageSource( DateTimeZone hiveStorageTimeZone, HiveFileContext hiveFileContext, Optional encryptionInformation, - boolean appendRowNumberEnabled, - boolean footerStatsUnreliable) + boolean appendRowNumberEnabled) { if (!PARQUET_SERDE_CLASS_NAMES.contains(storage.getStorageFormat().getSerDe())) { return Optional.empty(); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/rcfile/RcFilePageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/rcfile/RcFilePageSourceFactory.java index b01e780efdfda..05023f7872784 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/rcfile/RcFilePageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/rcfile/RcFilePageSourceFactory.java @@ -57,7 +57,6 @@ import java.util.Optional; import java.util.Properties; -import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.AGGREGATED; import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA; import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA; @@ -109,12 +108,6 @@ public Optional createPageSource( HiveFileContext hiveFileContext, Optional encryptionInformation) { - if (!columns.isEmpty() && columns.stream().allMatch(hiveColumnHandle -> hiveColumnHandle.getColumnType() == AGGREGATED)) { - throw new UnsupportedOperationException("Partial aggregation pushdown only supported for ORC/Parquet files. " + - "Table " + tableName.toString() + " has file (" + fileSplit.getPath() + ") of format " + storage.getStorageFormat().getOutputFormat() + - ". Set session property hive.pushdown_partial_aggregations_into_scan=false and execute query again"); - } - RcFileEncoding rcFileEncoding; if (LazyBinaryColumnarSerDe.class.getName().equals(storage.getStorageFormat().getSerDe())) { rcFileEncoding = new BinaryRcFileEncoding(); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSourceProvider.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSourceProvider.java index a5160be8e7bb8..c3c029aa27a20 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSourceProvider.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSourceProvider.java @@ -644,7 +644,7 @@ private static class MockOrcSelectivePageSourceFactory implements HiveSelectivePageSourceFactory { @Override - public Optional createPageSource(Configuration configuration, ConnectorSession session, HiveFileSplit fileSplit, Storage storage, List columns, Map prefilledValues, Map coercers, Optional bucketAdaptation, List outputColumns, TupleDomain domainPredicate, RowExpression remainingPredicate, DateTimeZone hiveStorageTimeZone, HiveFileContext hiveFileContext, Optional encryptionInformation, boolean appendRowNumberEnabled, boolean footerStatsUnreliable) + public Optional createPageSource(Configuration configuration, ConnectorSession session, HiveFileSplit fileSplit, Storage storage, List columns, Map prefilledValues, Map coercers, Optional bucketAdaptation, List outputColumns, TupleDomain domainPredicate, RowExpression remainingPredicate, DateTimeZone hiveStorageTimeZone, HiveFileContext hiveFileContext, Optional encryptionInformation, boolean appendRowNumberEnabled) { if (!OrcSerde.class.getName().equals(storage.getStorageFormat().getSerDe())) { return Optional.empty(); From ed7bb4bca304af13a30792ec1af5a07dabde8e21 Mon Sep 17 00:00:00 2001 From: Rebecca Schlussel Date: Mon, 4 Mar 2024 12:14:50 -0500 Subject: [PATCH 5/5] Consolidate error handling for ParquetPageSourceFactory create a utility method so we can share the error handling code between aggregated and batch page source factories. --- .../ParquetAggregatedPageSourceFactory.java | 36 ++----------------- .../parquet/ParquetPageSourceFactory.java | 34 ++---------------- 2 files changed, 4 insertions(+), 66 deletions(-) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetAggregatedPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetAggregatedPageSourceFactory.java index 8c9bbc9138e30..7144141e289c4 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetAggregatedPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetAggregatedPageSourceFactory.java @@ -22,39 +22,29 @@ import com.facebook.presto.hive.HiveFileContext; import com.facebook.presto.hive.HiveFileSplit; import com.facebook.presto.hive.metastore.Storage; -import com.facebook.presto.parquet.ParquetCorruptionException; import com.facebook.presto.parquet.ParquetDataSource; import com.facebook.presto.parquet.cache.ParquetMetadataSource; import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.ConnectorSession; -import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.AccessControlException; -import org.apache.parquet.crypto.HiddenColumnException; import org.apache.parquet.crypto.InternalFileDecryptor; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import javax.inject.Inject; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.List; import java.util.Optional; import java.util.Set; import static com.facebook.presto.hive.HiveCommonSessionProperties.getReadNullMaskedParquetEncryptedValue; -import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA; -import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; -import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA; +import static com.facebook.presto.hive.orc.OrcPageSourceFactoryUtils.mapToPrestoException; import static com.facebook.presto.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource; import static com.facebook.presto.hive.parquet.ParquetPageSourceFactory.createDecryptor; -import static com.facebook.presto.spi.StandardErrorCode.PERMISSION_DENIED; -import static com.google.common.base.Strings.nullToEmpty; -import static java.lang.String.format; import static java.util.Objects.requireNonNull; public class ParquetAggregatedPageSourceFactory @@ -154,29 +144,7 @@ public static ConnectorPageSource createParquetPageSource( } catch (IOException ignored) { } - if (e instanceof PrestoException) { - throw (PrestoException) e; - } - if (e instanceof ParquetCorruptionException) { - throw new PrestoException(HIVE_BAD_DATA, e); - } - if (e instanceof AccessControlException) { - throw new PrestoException(PERMISSION_DENIED, e.getMessage(), e); - } - if (nullToEmpty(e.getMessage()).trim().equals("Filesystem closed") || - e instanceof FileNotFoundException) { - throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, e); - } - String message = format("Error opening Hive split %s (offset=%s, length=%s): %s", path, fileSplit.getStart(), fileSplit.getLength(), e.getMessage()); - if (e.getClass().getSimpleName().equals("BlockMissingException")) { - throw new PrestoException(HIVE_MISSING_DATA, message, e); - } - if (e instanceof HiddenColumnException) { - message = format("User does not have access to encryption key for encrypted column = %s. If returning 'null' for encrypted " + - "columns is acceptable to your query, please add 'set session hive.read_null_masked_parquet_encrypted_value_enabled=true' before your query", ((HiddenColumnException) e).getColumn()); - throw new PrestoException(PERMISSION_DENIED, message, e); - } - throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, message, e); + throw mapToPrestoException(e, path, fileSplit); } } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java index 656761451c76d..efbca4fe040f6 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java @@ -31,7 +31,6 @@ import com.facebook.presto.hive.metastore.Storage; import com.facebook.presto.memory.context.AggregatedMemoryContext; import com.facebook.presto.parquet.Field; -import com.facebook.presto.parquet.ParquetCorruptionException; import com.facebook.presto.parquet.ParquetDataSource; import com.facebook.presto.parquet.RichColumnDescriptor; import com.facebook.presto.parquet.cache.ParquetMetadataSource; @@ -49,11 +48,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.AccessControlException; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.crypto.DecryptionPropertiesFactory; import org.apache.parquet.crypto.FileDecryptionProperties; -import org.apache.parquet.crypto.HiddenColumnException; import org.apache.parquet.crypto.InternalFileDecryptor; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -69,7 +66,6 @@ import javax.inject.Inject; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -105,11 +101,9 @@ import static com.facebook.presto.hive.HiveCommonSessionProperties.isParquetBatchReaderVerificationEnabled; import static com.facebook.presto.hive.HiveCommonSessionProperties.isParquetBatchReadsEnabled; import static com.facebook.presto.hive.HiveCommonSessionProperties.isUseParquetColumnNames; -import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA; -import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; -import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA; import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH; import static com.facebook.presto.hive.HiveSessionProperties.columnIndexFilterEnabled; +import static com.facebook.presto.hive.orc.OrcPageSourceFactoryUtils.mapToPrestoException; import static com.facebook.presto.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource; import static com.facebook.presto.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static com.facebook.presto.parquet.ParquetTypeUtils.columnPathFromSubfield; @@ -121,9 +115,7 @@ import static com.facebook.presto.parquet.ParquetTypeUtils.nestedColumnPath; import static com.facebook.presto.parquet.predicate.PredicateUtils.buildPredicate; import static com.facebook.presto.parquet.predicate.PredicateUtils.predicateMatches; -import static com.facebook.presto.spi.StandardErrorCode.PERMISSION_DENIED; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Strings.nullToEmpty; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.PRIMITIVE; @@ -325,29 +317,7 @@ else if (getParquetType(type, fileSchema, useParquetColumnNames, column, tableNa } catch (IOException ignored) { } - if (e instanceof PrestoException) { - throw (PrestoException) e; - } - if (e instanceof ParquetCorruptionException) { - throw new PrestoException(HIVE_BAD_DATA, e); - } - if (e instanceof AccessControlException) { - throw new PrestoException(PERMISSION_DENIED, e.getMessage(), e); - } - if (nullToEmpty(e.getMessage()).trim().equals("Filesystem closed") || - e instanceof FileNotFoundException) { - throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, e); - } - String message = format("Error opening Hive split %s (offset=%s, length=%s): %s", path, fileSplit.getStart(), fileSplit.getLength(), e.getMessage()); - if (e.getClass().getSimpleName().equals("BlockMissingException")) { - throw new PrestoException(HIVE_MISSING_DATA, message, e); - } - if (e instanceof HiddenColumnException) { - message = format("User does not have access to encryption key for encrypted column = %s. If returning 'null' for encrypted " + - "columns is acceptable to your query, please add 'set session hive.read_null_masked_parquet_encrypted_value_enabled=true' before your query", ((HiddenColumnException) e).getColumn()); - throw new PrestoException(PERMISSION_DENIED, message, e); - } - throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, message, e); + throw mapToPrestoException(e, path, fileSplit); } }