diff --git a/presto-hive-hadoop2/bin/run_hive_s3_tests.sh b/presto-hive-hadoop2/bin/run_hive_s3_tests.sh index 47a23830e9ef2..445876c27a015 100755 --- a/presto-hive-hadoop2/bin/run_hive_s3_tests.sh +++ b/presto-hive-hadoop2/bin/run_hive_s3_tests.sh @@ -7,6 +7,9 @@ set -euo pipefail -x cleanup_docker_containers start_docker_containers +# obtain Hive version +TESTS_HIVE_VERSION_MAJOR=$(get_hive_major_version) + # insert AWS credentials exec_in_hadoop_master_container cp /etc/hadoop/conf/core-site.xml.s3-template /etc/hadoop/conf/core-site.xml exec_in_hadoop_master_container sed -i \ @@ -18,10 +21,8 @@ exec_in_hadoop_master_container sed -i \ # create test table table_path="s3a://${S3_BUCKET}/presto_test_external_fs/" exec_in_hadoop_master_container hadoop fs -mkdir -p "${table_path}" -exec_in_hadoop_master_container hadoop fs -copyFromLocal -f /tmp/test1.csv "${table_path}" -exec_in_hadoop_master_container hadoop fs -copyFromLocal -f /tmp/test1.csv.gz "${table_path}" -exec_in_hadoop_master_container hadoop fs -copyFromLocal -f /tmp/test1.csv.lz4 "${table_path}" -exec_in_hadoop_master_container hadoop fs -copyFromLocal -f /tmp/test1.csv.bz2 "${table_path}" +exec_in_hadoop_master_container hadoop fs -put -f /tmp/files/test1.csv{,.gz,.bz2,.lz4} "${table_path}" + exec_in_hadoop_master_container /usr/bin/hive -e "CREATE EXTERNAL TABLE presto_test_external_fs(t_bigint bigint) LOCATION '${table_path}'" stop_unnecessary_hadoop_services @@ -40,6 +41,7 @@ set +e -Dhive.hadoop2.databaseName=default \ -Dhive.hadoop2.s3.awsAccessKey=${AWS_ACCESS_KEY_ID} \ -Dhive.hadoop2.s3.awsSecretKey=${AWS_SECRET_ACCESS_KEY} \ + -Dhive.hadoop2.hiveVersionMajor="${TESTS_HIVE_VERSION_MAJOR}" \ -Dhive.hadoop2.s3.writableBucket=${S3_BUCKET} EXIT_CODE=$? set -e diff --git a/presto-hive-hadoop2/conf/docker-compose.yml b/presto-hive-hadoop2/conf/docker-compose.yml index 5abc18d02a07c..28d1a18458879 100644 --- a/presto-hive-hadoop2/conf/docker-compose.yml +++ b/presto-hive-hadoop2/conf/docker-compose.yml @@ -20,7 +20,4 @@ services: - ../../presto-hive/src/test/sql:/files/sql:ro - ./files/words:/usr/share/dict/words:ro - ./files/core-site.xml.s3-template:/etc/hadoop/conf/core-site.xml.s3-template:ro - - ./files/test1.csv:/tmp/test1.csv:ro - - ./files/test1.csv.gz:/tmp/test1.csv.gz:ro - - ./files/test1.csv.lz4:/tmp/test1.csv.lz4:ro - - ./files/test1.csv.bz2:/tmp/test1.csv.bz2:ro + - ./files:/tmp/files:ro \ No newline at end of file diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java index 69d97223066bc..db575567e3dc7 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java @@ -60,11 +60,9 @@ import com.facebook.presto.sql.analyzer.FeaturesConfig; import com.facebook.presto.sql.gen.JoinCompiler; import com.facebook.presto.testing.MaterializedResult; -import com.facebook.presto.testing.MaterializedRow; import com.facebook.presto.testing.TestingConnectorSession; import com.facebook.presto.testing.TestingNodeManager; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; import com.google.common.net.HostAndPort; @@ -79,7 +77,6 @@ import java.io.UncheckedIOException; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -94,6 +91,7 @@ import static com.facebook.presto.hive.AbstractTestHiveClient.filterNonHiddenColumnHandles; import static com.facebook.presto.hive.AbstractTestHiveClient.filterNonHiddenColumnMetadata; import static com.facebook.presto.hive.AbstractTestHiveClient.getAllSplits; +import static com.facebook.presto.hive.HiveFileSystemTestUtils.getTableHandle; import static com.facebook.presto.hive.HiveQueryRunner.METASTORE_CONTEXT; import static com.facebook.presto.hive.HiveTestUtils.FILTER_STATS_CALCULATOR_SERVICE; import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_AND_TYPE_MANAGER; @@ -111,7 +109,6 @@ import static com.facebook.presto.spi.SplitContext.NON_CACHEABLE; import static com.facebook.presto.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING; import static com.facebook.presto.testing.MaterializedResult.materializeSourceDataStream; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; @@ -265,40 +262,24 @@ protected Transaction newTransaction() return new HiveTransaction(transactionManager, metadataFactory.get()); } + protected MaterializedResult readTable(SchemaTableName tableName) + throws IOException + { + return HiveFileSystemTestUtils.readTable(tableName, transactionManager, config, metadataFactory, pageSourceProvider, splitManager); + } + @Test public void testGetRecords() throws Exception { - try (Transaction transaction = newTransaction()) { - ConnectorMetadata metadata = transaction.getMetadata(); - ConnectorSession session = newSession(); - - ConnectorTableHandle table = getTableHandle(metadata, this.table); - List columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, table).values()); - Map columnIndex = indexColumns(columnHandles); - - List tableLayoutResults = metadata.getTableLayouts(session, table, Constraint.alwaysTrue(), Optional.empty()); - HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) getOnlyElement(tableLayoutResults).getTableLayout().getHandle(); - assertEquals(layoutHandle.getPartitions().get().size(), 1); - ConnectorSplitSource splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, layoutHandle, SPLIT_SCHEDULING_CONTEXT); - - TableHandle tableHandle = new TableHandle(new ConnectorId(database), table, transaction.getTransactionHandle(), Optional.of(layoutHandle)); - - long sum = 0; - - for (ConnectorSplit split : getAllSplits(splitSource)) { - try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, tableHandle.getLayout().get(), columnHandles, NON_CACHEABLE)) { - MaterializedResult result = materializeSourceDataStream(session, pageSource, getTypes(columnHandles)); - - for (MaterializedRow row : result) { - sum += (Long) row.getField(columnIndex.get("t_bigint")); - } - } - } - // The test table is made up of multiple S3 objects with same data and different compression codec - // formats: uncompressed | .gz | .lz4 | .bz2 - assertEquals(sum, 78300 * 4); - } + assertEqualsIgnoreOrder( + readTable(table), + MaterializedResult.resultBuilder(newSession(), BIGINT) + .row(70000L).row(8000L).row(300L) // test1.csv + .row(70000L).row(8000L).row(300L) // test1.csv.gz + .row(70000L).row(8000L).row(300L) // test1.csv.bz2 + .row(70000L).row(8000L).row(300L) // test1.csv.lz4 + .build()); } @Test @@ -447,11 +428,11 @@ private void createTable(MetastoreContext metastoreContext, SchemaTableName tabl ConnectorSession session = newSession(); // load the new table - ConnectorTableHandle hiveTableHandle = getTableHandle(metadata, tableName); + ConnectorTableHandle hiveTableHandle = getTableHandle(metadata, tableName, session); List columnHandles = filterNonHiddenColumnHandles(metadata.getColumnHandles(session, hiveTableHandle).values()); // verify the metadata - ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(session, getTableHandle(metadata, tableName)); + ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(session, getTableHandle(metadata, tableName, session)); assertEquals(filterNonHiddenColumnMetadata(tableMetadata.getColumns()), columns); // verify the data @@ -478,25 +459,6 @@ private void dropTable(SchemaTableName table) } } - private ConnectorTableHandle getTableHandle(ConnectorMetadata metadata, SchemaTableName tableName) - { - ConnectorTableHandle handle = metadata.getTableHandle(newSession(), tableName); - checkArgument(handle != null, "table not found: %s", tableName); - return handle; - } - - private static ImmutableMap indexColumns(List columnHandles) - { - ImmutableMap.Builder index = ImmutableMap.builder(); - int i = 0; - for (ColumnHandle columnHandle : columnHandles) { - HiveColumnHandle hiveColumnHandle = (HiveColumnHandle) columnHandle; - index.put(hiveColumnHandle.getName(), i); - i++; - } - return index.build(); - } - private static class TestingHiveMetastore extends CachingHiveMetastore { diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/HiveFileSystemTestUtils.java b/presto-hive/src/test/java/com/facebook/presto/hive/HiveFileSystemTestUtils.java new file mode 100644 index 0000000000000..1c807490e7d65 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/HiveFileSystemTestUtils.java @@ -0,0 +1,156 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.cache.CacheConfig; +import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.hive.AbstractTestHiveClient.HiveTransaction; +import com.facebook.presto.hive.AbstractTestHiveClient.Transaction; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayoutResult; +import com.facebook.presto.spi.Constraint; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.SplitContext; +import com.facebook.presto.spi.TableHandle; +import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; +import com.facebook.presto.spi.connector.ConnectorSplitManager; +import com.facebook.presto.testing.MaterializedResult; +import com.facebook.presto.testing.MaterializedRow; +import com.facebook.presto.testing.TestingConnectorSession; +import com.google.common.collect.ImmutableList; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.stream.IntStream; + +import static com.facebook.presto.hive.AbstractTestHiveClient.getAllSplits; +import static com.facebook.presto.hive.AbstractTestHiveFileSystem.SPLIT_SCHEDULING_CONTEXT; +import static com.facebook.presto.hive.HiveTestUtils.getTypes; +import static com.facebook.presto.testing.MaterializedResult.materializeSourceDataStream; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.getOnlyElement; + +public class HiveFileSystemTestUtils +{ + private HiveFileSystemTestUtils() {} + + public static MaterializedResult readTable(SchemaTableName tableName, + HiveTransactionManager transactionManager, + HiveClientConfig config, + HiveMetadataFactory metadataFactory, + ConnectorPageSourceProvider pageSourceProvider, + ConnectorSplitManager splitManager) + throws IOException + { + ConnectorMetadata metadata = null; + ConnectorSession session = null; + ConnectorSplitSource splitSource = null; + + try (Transaction transaction = newTransaction(transactionManager, metadataFactory.get())) { + metadata = transaction.getMetadata(); + session = newSession(config); + + ConnectorTableHandle table = getTableHandle(metadata, tableName, session); + List columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, table).values()); + List tableLayoutResults = metadata.getTableLayouts(session, table, Constraint.alwaysTrue(), Optional.empty()); + HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) getOnlyElement(tableLayoutResults).getTableLayout().getHandle(); + TableHandle tableHandle = new TableHandle(new ConnectorId(tableName.getSchemaName()), table, transaction.getTransactionHandle(), Optional.of(layoutHandle)); + + metadata.beginQuery(session); + + splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, tableHandle.getLayout().get(), SPLIT_SCHEDULING_CONTEXT); + + List allTypes = getTypes(columnHandles); + List dataTypes = getTypes(columnHandles.stream() + .filter(columnHandle -> !((HiveColumnHandle) columnHandle).isHidden()) + .collect(toImmutableList())); + MaterializedResult.Builder result = MaterializedResult.resultBuilder(session, dataTypes); + + List splits = getAllSplits(splitSource); + for (ConnectorSplit split : splits) { + try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource( + transaction.getTransactionHandle(), + session, + split, + tableHandle.getLayout().get(), + columnHandles, + new SplitContext(false, TupleDomain.none()))) { + MaterializedResult pageSourceResult = materializeSourceDataStream(session, pageSource, allTypes); + for (MaterializedRow row : pageSourceResult.getMaterializedRows()) { + Object[] dataValues = IntStream.range(0, row.getFieldCount()) + .filter(channel -> !((HiveColumnHandle) columnHandles.get(channel)).isHidden()) + .mapToObj(row::getField) + .toArray(); + result.row(dataValues); + } + } + } + return result.build(); + } + finally { + cleanUpQuery(metadata, session); + closeQuietly(splitSource); + } + } + + public static Transaction newTransaction(HiveTransactionManager transactionManager, HiveMetadata hiveMetadata) + { + return new HiveTransaction(transactionManager, hiveMetadata); + } + + public static ConnectorSession newSession(HiveClientConfig config) + { + return new TestingConnectorSession(new HiveSessionProperties( + config, + new OrcFileWriterConfig(), + new ParquetFileWriterConfig(), + new CacheConfig()).getSessionProperties()); + } + + public static ConnectorTableHandle getTableHandle(ConnectorMetadata metadata, SchemaTableName tableName, ConnectorSession session) + { + ConnectorTableHandle handle = metadata.getTableHandle(session, tableName); + checkArgument(handle != null, "table not found: %s", tableName); + return handle; + } + + private static void closeQuietly(Closeable closeable) + { + try { + if (closeable != null) { + closeable.close(); + } + } + catch (IOException ignored) { + } + } + + private static void cleanUpQuery(ConnectorMetadata metadata, ConnectorSession session) + { + if (metadata != null && session != null) { + metadata.cleanupQuery(session); + } + } +} diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/s3select/TestS3SelectPushdown.java b/presto-hive/src/test/java/com/facebook/presto/hive/s3select/TestS3SelectPushdown.java index 294daa6df52e0..bd54e910ce3e7 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/s3select/TestS3SelectPushdown.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/s3select/TestS3SelectPushdown.java @@ -13,24 +13,94 @@ */ package com.facebook.presto.hive.s3select; +import com.facebook.presto.hive.metastore.Column; +import com.facebook.presto.hive.metastore.Partition; +import com.facebook.presto.hive.metastore.Storage; +import com.facebook.presto.hive.metastore.StorageFormat; +import com.facebook.presto.hive.metastore.Table; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.testing.TestingConnectorSession; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.util.Optional; +import java.util.Properties; + +import static com.facebook.presto.hive.HiveStorageFormat.ORC; +import static com.facebook.presto.hive.HiveStorageFormat.TEXTFILE; +import static com.facebook.presto.hive.HiveType.HIVE_BINARY; +import static com.facebook.presto.hive.HiveType.HIVE_BOOLEAN; +import static com.facebook.presto.hive.metastore.PrestoTableType.EXTERNAL_TABLE; +import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat; +import static com.facebook.presto.hive.s3select.S3SelectPushdown.shouldEnablePushdownForTable; +import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty; +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; public class TestS3SelectPushdown { + private static final String S3_SELECT_PUSHDOWN_ENABLED = "s3_select_pushdown_enabled"; + private TextInputFormat inputFormat; + private ConnectorSession session; + private Table table; + private Partition partition; + private Storage storage; + private Column column; + private Properties schema; @BeforeClass public void setUp() { inputFormat = new TextInputFormat(); inputFormat.configure(new JobConf()); + + session = initTestingConnectorSession(true); + + column = new Column("column", HIVE_BOOLEAN, Optional.empty(), Optional.empty()); + + storage = Storage.builder() + .setStorageFormat(fromHiveStorageFormat(TEXTFILE)) + .setLocation("location") + .build(); + + partition = new Partition( + "db", + "table", + emptyList(), + storage, + singletonList(column), + emptyMap(), + Optional.empty(), + false, + false, + 1234, + 4567L); + + table = new Table( + "db", + "table", + "owner", + EXTERNAL_TABLE, + storage, + singletonList(column), + emptyList(), + emptyMap(), + Optional.empty(), + Optional.empty()); + + schema = new Properties(); + schema.setProperty(SERIALIZATION_LIB, LazySimpleSerDe.class.getName()); } @Test @@ -42,4 +112,138 @@ public void testIsCompressionCodecSupported() assertFalse(S3SelectPushdown.isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject.snappy"))); assertTrue(S3SelectPushdown.isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject.bz2"))); } + + @Test + public void testShouldEnableSelectPushdown() + { + assertTrue(shouldEnablePushdownForTable(session, table, "s3://fakeBucket/fakeObject", Optional.empty())); + assertTrue(shouldEnablePushdownForTable(session, table, "s3://fakeBucket/fakeObject", Optional.of(partition))); + } + + @Test + public void testShouldNotEnableSelectPushdownWhenDisabledOnSession() + { + ConnectorSession testSession = initTestingConnectorSession(false); + assertFalse(shouldEnablePushdownForTable(testSession, table, "", Optional.empty())); + } + + @Test + public void testShouldNotEnableSelectPushdownWhenIsNotS3StoragePath() + { + assertFalse(shouldEnablePushdownForTable(session, table, null, Optional.empty())); + assertFalse(shouldEnablePushdownForTable(session, table, "", Optional.empty())); + assertFalse(shouldEnablePushdownForTable(session, table, "s3:/invalid", Optional.empty())); + assertFalse(shouldEnablePushdownForTable(session, table, "s3:/invalid", Optional.of(partition))); + } + + @Test + public void testShouldNotEnableSelectPushdownWhenIsNotSupportedSerde() + { + Storage newStorage = Storage.builder() + .setStorageFormat(fromHiveStorageFormat(ORC)) + .setLocation("location") + .build(); + Table newTable = new Table( + "db", + "table", + "owner", + EXTERNAL_TABLE, + newStorage, + singletonList(column), + emptyList(), + emptyMap(), + Optional.empty(), + Optional.empty()); + + assertFalse(shouldEnablePushdownForTable(session, newTable, "s3://fakeBucket/fakeObject", Optional.empty())); + + Partition newPartition = new Partition( + "db", + "table", + emptyList(), + newStorage, + singletonList(column), + emptyMap(), + Optional.empty(), + false, + false, + 1234, + 4567L); + assertFalse(shouldEnablePushdownForTable(session, newTable, "s3://fakeBucket/fakeObject", Optional.of(newPartition))); + } + + @Test + public void testShouldNotEnableSelectPushdownWhenInputFormatIsNotSupported() + { + Storage newStorage = Storage.builder() + .setStorageFormat(StorageFormat.create(LazySimpleSerDe.class.getName(), "inputFormat", "outputFormat")) + .setLocation("location") + .build(); + Table newTable = new Table( + "db", + "table", + "owner", + EXTERNAL_TABLE, + newStorage, + singletonList(column), + emptyList(), + emptyMap(), + Optional.empty(), + Optional.empty()); + assertFalse(shouldEnablePushdownForTable(session, newTable, "s3://fakeBucket/fakeObject", Optional.empty())); + } + + @Test + public void testShouldNotEnableSelectPushdownWhenColumnTypesAreNotSupported() + { + Column newColumn = new Column("column", HIVE_BINARY, Optional.empty(), Optional.empty()); + Table newTable = new Table( + "db", + "table", + "owner", + EXTERNAL_TABLE, + storage, + singletonList(newColumn), + emptyList(), + emptyMap(), + Optional.empty(), + Optional.empty()); + assertFalse(shouldEnablePushdownForTable(session, newTable, "s3://fakeBucket/fakeObject", Optional.empty())); + + Partition newPartition = new Partition( + "db", + "table", + emptyList(), + storage, + singletonList(column), + emptyMap(), + Optional.empty(), + false, + false, + 1234, + 4567L); + assertFalse(shouldEnablePushdownForTable(session, newTable, "s3://fakeBucket/fakeObject", Optional.of(newPartition))); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + { + inputFormat = null; + session = null; + table = null; + partition = null; + storage = null; + column = null; + schema = null; + } + + private TestingConnectorSession initTestingConnectorSession(boolean enableSelectPushdown) + { + return new TestingConnectorSession(singletonList(booleanProperty( + S3_SELECT_PUSHDOWN_ENABLED, + "S3 Select pushdown enabled", + true, + false)), + ImmutableMap.of(S3_SELECT_PUSHDOWN_ENABLED, enableSelectPushdown)); + } }