Skip to content

Commit

Permalink
Refactor S3 tests
Browse files Browse the repository at this point in the history
Move common methods for Hive File System tests setup to utility class.
Add more test cases for S3 Select Pushdown enabling.
  • Loading branch information
dnnanuti authored and pettyjamesm committed Jan 3, 2023
1 parent e0f637f commit 974bbba
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 63 deletions.
10 changes: 6 additions & 4 deletions presto-hive-hadoop2/bin/run_hive_s3_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ set -euo pipefail -x
cleanup_docker_containers
start_docker_containers

# obtain Hive version
TESTS_HIVE_VERSION_MAJOR=$(get_hive_major_version)

# insert AWS credentials
exec_in_hadoop_master_container cp /etc/hadoop/conf/core-site.xml.s3-template /etc/hadoop/conf/core-site.xml
exec_in_hadoop_master_container sed -i \
Expand All @@ -18,10 +21,8 @@ exec_in_hadoop_master_container sed -i \
# create test table
table_path="s3a://${S3_BUCKET}/presto_test_external_fs/"
exec_in_hadoop_master_container hadoop fs -mkdir -p "${table_path}"
exec_in_hadoop_master_container hadoop fs -copyFromLocal -f /tmp/test1.csv "${table_path}"
exec_in_hadoop_master_container hadoop fs -copyFromLocal -f /tmp/test1.csv.gz "${table_path}"
exec_in_hadoop_master_container hadoop fs -copyFromLocal -f /tmp/test1.csv.lz4 "${table_path}"
exec_in_hadoop_master_container hadoop fs -copyFromLocal -f /tmp/test1.csv.bz2 "${table_path}"
exec_in_hadoop_master_container hadoop fs -put -f /tmp/files/test1.csv{,.gz,.bz2,.lz4} "${table_path}"

exec_in_hadoop_master_container /usr/bin/hive -e "CREATE EXTERNAL TABLE presto_test_external_fs(t_bigint bigint) LOCATION '${table_path}'"

stop_unnecessary_hadoop_services
Expand All @@ -40,6 +41,7 @@ set +e
-Dhive.hadoop2.databaseName=default \
-Dhive.hadoop2.s3.awsAccessKey=${AWS_ACCESS_KEY_ID} \
-Dhive.hadoop2.s3.awsSecretKey=${AWS_SECRET_ACCESS_KEY} \
-Dhive.hadoop2.hiveVersionMajor="${TESTS_HIVE_VERSION_MAJOR}" \
-Dhive.hadoop2.s3.writableBucket=${S3_BUCKET}
EXIT_CODE=$?
set -e
Expand Down
5 changes: 1 addition & 4 deletions presto-hive-hadoop2/conf/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,4 @@ services:
- ../../presto-hive/src/test/sql:/files/sql:ro
- ./files/words:/usr/share/dict/words:ro
- ./files/core-site.xml.s3-template:/etc/hadoop/conf/core-site.xml.s3-template:ro
- ./files/test1.csv:/tmp/test1.csv:ro
- ./files/test1.csv.gz:/tmp/test1.csv.gz:ro
- ./files/test1.csv.lz4:/tmp/test1.csv.lz4:ro
- ./files/test1.csv.bz2:/tmp/test1.csv.bz2:ro
- ./files:/tmp/files:ro
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,9 @@
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.gen.JoinCompiler;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.TestingConnectorSession;
import com.facebook.presto.testing.TestingNodeManager;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.HostAndPort;
Expand All @@ -79,7 +77,6 @@
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
Expand All @@ -94,6 +91,7 @@
import static com.facebook.presto.hive.AbstractTestHiveClient.filterNonHiddenColumnHandles;
import static com.facebook.presto.hive.AbstractTestHiveClient.filterNonHiddenColumnMetadata;
import static com.facebook.presto.hive.AbstractTestHiveClient.getAllSplits;
import static com.facebook.presto.hive.HiveFileSystemTestUtils.getTableHandle;
import static com.facebook.presto.hive.HiveQueryRunner.METASTORE_CONTEXT;
import static com.facebook.presto.hive.HiveTestUtils.FILTER_STATS_CALCULATOR_SERVICE;
import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_AND_TYPE_MANAGER;
Expand All @@ -111,7 +109,6 @@
import static com.facebook.presto.spi.SplitContext.NON_CACHEABLE;
import static com.facebook.presto.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING;
import static com.facebook.presto.testing.MaterializedResult.materializeSourceDataStream;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
Expand Down Expand Up @@ -265,40 +262,24 @@ protected Transaction newTransaction()
return new HiveTransaction(transactionManager, metadataFactory.get());
}

protected MaterializedResult readTable(SchemaTableName tableName)
throws IOException
{
return HiveFileSystemTestUtils.readTable(tableName, transactionManager, config, metadataFactory, pageSourceProvider, splitManager);
}

@Test
public void testGetRecords()
throws Exception
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();

ConnectorTableHandle table = getTableHandle(metadata, this.table);
List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, table).values());
Map<String, Integer> columnIndex = indexColumns(columnHandles);

List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, table, Constraint.alwaysTrue(), Optional.empty());
HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) getOnlyElement(tableLayoutResults).getTableLayout().getHandle();
assertEquals(layoutHandle.getPartitions().get().size(), 1);
ConnectorSplitSource splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, layoutHandle, SPLIT_SCHEDULING_CONTEXT);

TableHandle tableHandle = new TableHandle(new ConnectorId(database), table, transaction.getTransactionHandle(), Optional.of(layoutHandle));

long sum = 0;

for (ConnectorSplit split : getAllSplits(splitSource)) {
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, tableHandle.getLayout().get(), columnHandles, NON_CACHEABLE)) {
MaterializedResult result = materializeSourceDataStream(session, pageSource, getTypes(columnHandles));

for (MaterializedRow row : result) {
sum += (Long) row.getField(columnIndex.get("t_bigint"));
}
}
}
// The test table is made up of multiple S3 objects with same data and different compression codec
// formats: uncompressed | .gz | .lz4 | .bz2
assertEquals(sum, 78300 * 4);
}
assertEqualsIgnoreOrder(
readTable(table),
MaterializedResult.resultBuilder(newSession(), BIGINT)
.row(70000L).row(8000L).row(300L) // test1.csv
.row(70000L).row(8000L).row(300L) // test1.csv.gz
.row(70000L).row(8000L).row(300L) // test1.csv.bz2
.row(70000L).row(8000L).row(300L) // test1.csv.lz4
.build());
}

@Test
Expand Down Expand Up @@ -447,11 +428,11 @@ private void createTable(MetastoreContext metastoreContext, SchemaTableName tabl
ConnectorSession session = newSession();

// load the new table
ConnectorTableHandle hiveTableHandle = getTableHandle(metadata, tableName);
ConnectorTableHandle hiveTableHandle = getTableHandle(metadata, tableName, session);
List<ColumnHandle> columnHandles = filterNonHiddenColumnHandles(metadata.getColumnHandles(session, hiveTableHandle).values());

// verify the metadata
ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(session, getTableHandle(metadata, tableName));
ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(session, getTableHandle(metadata, tableName, session));
assertEquals(filterNonHiddenColumnMetadata(tableMetadata.getColumns()), columns);

// verify the data
Expand All @@ -478,25 +459,6 @@ private void dropTable(SchemaTableName table)
}
}

private ConnectorTableHandle getTableHandle(ConnectorMetadata metadata, SchemaTableName tableName)
{
ConnectorTableHandle handle = metadata.getTableHandle(newSession(), tableName);
checkArgument(handle != null, "table not found: %s", tableName);
return handle;
}

private static ImmutableMap<String, Integer> indexColumns(List<ColumnHandle> columnHandles)
{
ImmutableMap.Builder<String, Integer> index = ImmutableMap.builder();
int i = 0;
for (ColumnHandle columnHandle : columnHandles) {
HiveColumnHandle hiveColumnHandle = (HiveColumnHandle) columnHandle;
index.put(hiveColumnHandle.getName(), i);
i++;
}
return index.build();
}

private static class TestingHiveMetastore
extends CachingHiveMetastore
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.cache.CacheConfig;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.hive.AbstractTestHiveClient.HiveTransaction;
import com.facebook.presto.hive.AbstractTestHiveClient.Transaction;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayoutResult;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SplitContext;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;

import static com.facebook.presto.hive.AbstractTestHiveClient.getAllSplits;
import static com.facebook.presto.hive.AbstractTestHiveFileSystem.SPLIT_SCHEDULING_CONTEXT;
import static com.facebook.presto.hive.HiveTestUtils.getTypes;
import static com.facebook.presto.testing.MaterializedResult.materializeSourceDataStream;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;

public class HiveFileSystemTestUtils
{
private HiveFileSystemTestUtils() {}

public static MaterializedResult readTable(SchemaTableName tableName,
HiveTransactionManager transactionManager,
HiveClientConfig config,
HiveMetadataFactory metadataFactory,
ConnectorPageSourceProvider pageSourceProvider,
ConnectorSplitManager splitManager)
throws IOException
{
ConnectorMetadata metadata = null;
ConnectorSession session = null;
ConnectorSplitSource splitSource = null;

try (Transaction transaction = newTransaction(transactionManager, metadataFactory.get())) {
metadata = transaction.getMetadata();
session = newSession(config);

ConnectorTableHandle table = getTableHandle(metadata, tableName, session);
List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, table).values());
List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, table, Constraint.alwaysTrue(), Optional.empty());
HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) getOnlyElement(tableLayoutResults).getTableLayout().getHandle();
TableHandle tableHandle = new TableHandle(new ConnectorId(tableName.getSchemaName()), table, transaction.getTransactionHandle(), Optional.of(layoutHandle));

metadata.beginQuery(session);

splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, tableHandle.getLayout().get(), SPLIT_SCHEDULING_CONTEXT);

List<Type> allTypes = getTypes(columnHandles);
List<Type> dataTypes = getTypes(columnHandles.stream()
.filter(columnHandle -> !((HiveColumnHandle) columnHandle).isHidden())
.collect(toImmutableList()));
MaterializedResult.Builder result = MaterializedResult.resultBuilder(session, dataTypes);

List<ConnectorSplit> splits = getAllSplits(splitSource);
for (ConnectorSplit split : splits) {
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(
transaction.getTransactionHandle(),
session,
split,
tableHandle.getLayout().get(),
columnHandles,
new SplitContext(false, TupleDomain.none()))) {
MaterializedResult pageSourceResult = materializeSourceDataStream(session, pageSource, allTypes);
for (MaterializedRow row : pageSourceResult.getMaterializedRows()) {
Object[] dataValues = IntStream.range(0, row.getFieldCount())
.filter(channel -> !((HiveColumnHandle) columnHandles.get(channel)).isHidden())
.mapToObj(row::getField)
.toArray();
result.row(dataValues);
}
}
}
return result.build();
}
finally {
cleanUpQuery(metadata, session);
closeQuietly(splitSource);
}
}

public static Transaction newTransaction(HiveTransactionManager transactionManager, HiveMetadata hiveMetadata)
{
return new HiveTransaction(transactionManager, hiveMetadata);
}

public static ConnectorSession newSession(HiveClientConfig config)
{
return new TestingConnectorSession(new HiveSessionProperties(
config,
new OrcFileWriterConfig(),
new ParquetFileWriterConfig(),
new CacheConfig()).getSessionProperties());
}

public static ConnectorTableHandle getTableHandle(ConnectorMetadata metadata, SchemaTableName tableName, ConnectorSession session)
{
ConnectorTableHandle handle = metadata.getTableHandle(session, tableName);
checkArgument(handle != null, "table not found: %s", tableName);
return handle;
}

private static void closeQuietly(Closeable closeable)
{
try {
if (closeable != null) {
closeable.close();
}
}
catch (IOException ignored) {
}
}

private static void cleanUpQuery(ConnectorMetadata metadata, ConnectorSession session)
{
if (metadata != null && session != null) {
metadata.cleanupQuery(session);
}
}
}
Loading

0 comments on commit 974bbba

Please sign in to comment.