Skip to content

Commit

Permalink
Fix native external table test for tpch queries
Browse files Browse the repository at this point in the history
  • Loading branch information
agrawalreetika authored and aditi-pandit committed Nov 11, 2024
1 parent 14b56d8 commit 61c7b5f
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.nativeworker;

import com.facebook.presto.Session;
import com.facebook.presto.hive.HiveType;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.testing.QueryRunner;
Expand All @@ -31,27 +32,36 @@
import java.util.Optional;

import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createCustomer;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createLineitem;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createLineitemStandard;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createNationWithFormat;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createOrders;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createPart;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createPartSupp;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createRegion;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createSupplier;
import static com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils.createExternalTable;
import static com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils.createSchemaIfNotExist;
import static com.facebook.presto.nativeworker.SymlinkManifestGeneratorUtils.cleanupSymlinkData;
import static com.facebook.presto.tpch.TpchMetadata.getPrestoType;
import static java.lang.String.format;

public abstract class AbstractTestNativeHiveExternalTableTpchQueries
extends AbstractTestNativeTpchQueries
{
private static final String HIVE = "hive";
private static final String TPCH = "tpch";
private static final String TPCH_SCHEMA = "tpch_schema";
private static final String TPCH_EXTERNAL_SCHEMA = "tpch_external_schema";
private static final String SYMLINK_FOLDER = "symlink_tables_manifests";
private static final String HIVE_DATA = "hive_data";
private static final ImmutableList<String> TPCH_TABLES = ImmutableList.of("orders", "lineitem", "nation",
"customer", "part", "partsupp", "region", "supplier");

@Override
public Session getSession()
{
return Session.builder(super.getSession()).setCatalog(HIVE).setSchema(TPCH_EXTERNAL_SCHEMA).build();
}

/**
* Returns the Hive type string corresponding to a given TPCH type.
* Currently only supports conversion for "integer" TPCH type.
Expand All @@ -77,13 +87,17 @@ private static String getHiveTypeString(String tpchType)
* @param tableName the name of the TPCH table
* @return a list of Column objects representing the columns of the table
*/
private static List<Column> getTpchTableColumns(String tableName)
private static List<Column> getTpchTableColumns(String tableName, boolean castDateToVarchar)
{
TpchTable<?> table = TpchTable.getTable(tableName);
ColumnNaming columnNaming = ColumnNaming.SIMPLIFIED;
ImmutableList.Builder<Column> columns = ImmutableList.builder();
for (TpchColumn<? extends TpchEntity> column : table.getColumns()) {
columns.add(new Column(columnNaming.getName(column), HiveType.valueOf(getHiveTypeString(getPrestoType(column).getDisplayName())), Optional.empty(), Optional.empty()));
HiveType hiveType = HiveType.valueOf(getHiveTypeString(getPrestoType(column).getDisplayName()));
if (castDateToVarchar && hiveType.getHiveTypeName().toString().equals("date")) {
hiveType = HiveType.valueOf("string");
}
columns.add(new Column(columnNaming.getName(column), hiveType, Optional.empty(), Optional.empty()));
}
return columns.build();
}
Expand All @@ -92,17 +106,19 @@ private static List<Column> getTpchTableColumns(String tableName)
protected void createTables()
{
QueryRunner javaQueryRunner = (QueryRunner) getExpectedQueryRunner();
createOrders(javaQueryRunner);
createLineitem(javaQueryRunner);
createNationWithFormat(javaQueryRunner, "PARQUET");
createCustomer(javaQueryRunner);
createPart(javaQueryRunner);
createPartSupp(javaQueryRunner);
createRegion(javaQueryRunner);
createSupplier(javaQueryRunner);
createSchemaIfNotExist(javaQueryRunner, TPCH_SCHEMA);
Session session = Session.builder(super.getSession()).setCatalog(HIVE).setSchema(TPCH_SCHEMA).build();
createOrders(session, javaQueryRunner, true);
createLineitemStandard(session, javaQueryRunner);
createNationWithFormat(session, javaQueryRunner, "PARQUET");
createCustomer(session, javaQueryRunner);
createPart(session, javaQueryRunner);
createPartSupp(session, javaQueryRunner);
createRegion(session, javaQueryRunner);
createSupplier(session, javaQueryRunner);

for (String tableName : TPCH_TABLES) {
createExternalTable(javaQueryRunner, TPCH, tableName, getTpchTableColumns(tableName));
createExternalTable(javaQueryRunner, TPCH_SCHEMA, tableName, getTpchTableColumns(tableName, true), TPCH_EXTERNAL_SCHEMA);
}
}

Expand All @@ -111,11 +127,12 @@ public void tearDown()
{
QueryRunner javaQueryRunner = (QueryRunner) getExpectedQueryRunner();
for (String tableName : TPCH_TABLES) {
dropTableIfExists(javaQueryRunner, HIVE, TPCH, tableName);
dropTableIfExists(javaQueryRunner, HIVE, TPCH_EXTERNAL_SCHEMA, tableName);
dropTableIfExists(javaQueryRunner, HIVE, TPCH_SCHEMA, tableName);
}

// https://github.com/prestodb/presto/issues/23908
// assertUpdate(format("DROP SCHEMA IF EXISTS %s.%s", HIVE, TPCH));
assertUpdate(format("DROP SCHEMA IF EXISTS %s.%s", HIVE, TPCH_SCHEMA));
assertUpdate(format("DROP SCHEMA IF EXISTS %s.%s", HIVE, TPCH_EXTERNAL_SCHEMA));

File dataDirectory = ((DistributedQueryRunner) javaQueryRunner).getCoordinator().getDataDirectory().resolve(HIVE_DATA).toFile();
Path symlinkTableDataPath = dataDirectory.toPath().getParent().resolve(SYMLINK_FOLDER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.facebook.presto.testing.QueryRunner;

import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createCustomer;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createLineitemForIceberg;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createLineitemStandard;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createNationWithFormat;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createOrders;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createPart;
Expand All @@ -32,7 +32,7 @@ public abstract class AbstractTestNativeIcebergTpchQueries
protected void createTables()
{
QueryRunner queryRunner = (QueryRunner) getExpectedQueryRunner();
createLineitemForIceberg(queryRunner);
createLineitemStandard(queryRunner);
createOrders(queryRunner);
createNationWithFormat(queryRunner, storageFormat);
createCustomer(queryRunner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.nativeworker;

import com.facebook.presto.Session;
import com.facebook.presto.testing.QueryRunner;
import com.google.common.collect.ImmutableMap;

Expand Down Expand Up @@ -99,7 +100,7 @@ public static void createAllTables(QueryRunner queryRunner, boolean castDateToVa
*/
public static void createAllIcebergTables(QueryRunner queryRunner)
{
createLineitemForIceberg(queryRunner);
createLineitemStandard(queryRunner);
createOrders(queryRunner);
createNationWithFormat(queryRunner, ICEBERG_DEFAULT_STORAGE_FORMAT);
createCustomer(queryRunner);
Expand Down Expand Up @@ -131,10 +132,15 @@ public static void createLineitem(QueryRunner queryRunner, boolean castDateToVar
"FROM tpch.tiny.lineitem");
}

public static void createLineitemForIceberg(QueryRunner queryRunner)
public static void createLineitemStandard(QueryRunner queryRunner)
{
if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "lineitem")) {
queryRunner.execute("CREATE TABLE lineitem AS " +
createLineitemStandard(queryRunner.getDefaultSession(), queryRunner);
}

public static void createLineitemStandard(Session session, QueryRunner queryRunner)
{
if (!queryRunner.tableExists(session, "lineitem")) {
queryRunner.execute(session, "CREATE TABLE lineitem AS " +
"SELECT orderkey, partkey, suppkey, linenumber, quantity, extendedprice, discount, tax, " +
" returnflag, linestatus, cast(shipdate as varchar) as shipdate, cast(commitdate as varchar) as commitdate, " +
" cast(receiptdate as varchar) as receiptdate, shipinstruct, shipmode, comment " +
Expand All @@ -149,9 +155,14 @@ public static void createOrders(QueryRunner queryRunner)

public static void createOrders(QueryRunner queryRunner, boolean castDateToVarchar)
{
queryRunner.execute("DROP TABLE IF EXISTS orders");
createOrders(queryRunner.getDefaultSession(), queryRunner, castDateToVarchar);
}

public static void createOrders(Session session, QueryRunner queryRunner, boolean castDateToVarchar)
{
queryRunner.execute(session, "DROP TABLE IF EXISTS orders");
String orderDate = castDateToVarchar ? "cast(orderdate as varchar) as orderdate" : "orderdate";
queryRunner.execute("CREATE TABLE orders AS " +
queryRunner.execute(session, "CREATE TABLE orders AS " +
"SELECT orderkey, custkey, orderstatus, totalprice, " + orderDate + ", " +
" orderpriority, clerk, shippriority, comment " +
"FROM tpch.tiny.orders");
Expand Down Expand Up @@ -192,20 +203,26 @@ public static void createNation(QueryRunner queryRunner)

public static void createNationWithFormat(QueryRunner queryRunner, String storageFormat)
{
createNationWithFormat(queryRunner.getDefaultSession(), queryRunner, storageFormat);
}

public static void createNationWithFormat(Session session, QueryRunner queryRunner, String storageFormat)
{
queryRunner.execute(session, "DROP TABLE IF EXISTS nation");
if (storageFormat.equals("PARQUET") && !queryRunner.tableExists(queryRunner.getDefaultSession(), "nation")) {
queryRunner.execute("CREATE TABLE nation AS SELECT * FROM tpch.tiny.nation");
queryRunner.execute(session, "CREATE TABLE nation AS SELECT * FROM tpch.tiny.nation");
}

if (storageFormat.equals("ORC") && !queryRunner.tableExists(queryRunner.getDefaultSession(), "nation")) {
queryRunner.execute("CREATE TABLE nation AS SELECT * FROM tpch.tiny.nation");
queryRunner.execute(session, "CREATE TABLE nation AS SELECT * FROM tpch.tiny.nation");
}

if (storageFormat.equals("JSON") && !queryRunner.tableExists(queryRunner.getDefaultSession(), "nation_json")) {
queryRunner.execute("CREATE TABLE nation_json WITH (FORMAT = 'JSON') AS SELECT * FROM tpch.tiny.nation");
queryRunner.execute(session, "CREATE TABLE nation_json WITH (FORMAT = 'JSON') AS SELECT * FROM tpch.tiny.nation");
}

if (storageFormat.equals("TEXTFILE") && !queryRunner.tableExists(queryRunner.getDefaultSession(), "nation_text")) {
queryRunner.execute("CREATE TABLE nation_text WITH (FORMAT = 'TEXTFILE') AS SELECT * FROM tpch.tiny.nation");
queryRunner.execute(session, "CREATE TABLE nation_text WITH (FORMAT = 'TEXTFILE') AS SELECT * FROM tpch.tiny.nation");
}
}

Expand Down Expand Up @@ -249,9 +266,14 @@ public static void createPrestoBenchTables(QueryRunner queryRunner)
}

public static void createCustomer(QueryRunner queryRunner)
{
createCustomer(queryRunner.getDefaultSession(), queryRunner);
}

public static void createCustomer(Session session, QueryRunner queryRunner)
{
if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "customer")) {
queryRunner.execute("CREATE TABLE customer AS " +
queryRunner.execute(session, "CREATE TABLE customer AS " +
"SELECT custkey, name, address, nationkey, phone, acctbal, comment, mktsegment " +
"FROM tpch.tiny.customer");
}
Expand Down Expand Up @@ -366,23 +388,38 @@ public static void createPrestoBenchCustomer(QueryRunner queryRunner)
}

public static void createPart(QueryRunner queryRunner)
{
createPart(queryRunner.getDefaultSession(), queryRunner);
}

public static void createPart(Session session, QueryRunner queryRunner)
{
if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "part")) {
queryRunner.execute("CREATE TABLE part AS SELECT * FROM tpch.tiny.part");
queryRunner.execute(session, "CREATE TABLE part AS SELECT * FROM tpch.tiny.part");
}
}

public static void createPartSupp(QueryRunner queryRunner)
{
createPartSupp(queryRunner.getDefaultSession(), queryRunner);
}

public static void createPartSupp(Session session, QueryRunner queryRunner)
{
if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "partsupp")) {
queryRunner.execute("CREATE TABLE partsupp AS SELECT * FROM tpch.tiny.partsupp");
queryRunner.execute(session, "CREATE TABLE partsupp AS SELECT * FROM tpch.tiny.partsupp");
}
}

public static void createRegion(QueryRunner queryRunner)
{
if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "region")) {
queryRunner.execute("CREATE TABLE region AS SELECT * FROM tpch.tiny.region");
createRegion(queryRunner.getDefaultSession(), queryRunner);
}

public static void createRegion(Session session, QueryRunner queryRunner)
{
if (!queryRunner.tableExists(session, "region")) {
queryRunner.execute(session, "CREATE TABLE region AS SELECT * FROM tpch.tiny.region");
}
}

Expand All @@ -403,8 +440,13 @@ public static void createTableToTestHiddenColumns(QueryRunner queryRunner)

public static void createSupplier(QueryRunner queryRunner)
{
if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "supplier")) {
queryRunner.execute("CREATE TABLE supplier AS SELECT * FROM tpch.tiny.supplier");
createSupplier(queryRunner.getDefaultSession(), queryRunner);
}

public static void createSupplier(Session session, QueryRunner queryRunner)
{
if (!queryRunner.tableExists(session, "supplier")) {
queryRunner.execute(session, "CREATE TABLE supplier AS SELECT * FROM tpch.tiny.supplier");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,19 @@ public static QueryRunner createJavaQueryRunner(Optional<Path> baseDataDirectory
return queryRunner;
}

public static void createExternalTable(QueryRunner queryRunner, String schemaName, String tableName, List<Column> columns)
public static void createSchemaIfNotExist(QueryRunner queryRunner, String schemaName)
{
ExtendedHiveMetastore metastore = getFileHiveMetastore((DistributedQueryRunner) queryRunner);
if (!metastore.getDatabase(METASTORE_CONTEXT, schemaName).isPresent()) {
metastore.createDatabase(METASTORE_CONTEXT, createDatabaseMetastoreObject(schemaName));
}
}

public static void createExternalTable(QueryRunner queryRunner, String sourceSchemaName, String tableName, List<Column> columns, String targetSchemaName)
{
ExtendedHiveMetastore metastore = getFileHiveMetastore((DistributedQueryRunner) queryRunner);
File dataDirectory = ((DistributedQueryRunner) queryRunner).getCoordinator().getDataDirectory().resolve(HIVE_DATA).toFile();
Path hiveTableDataPath = dataDirectory.toPath().resolve(schemaName).resolve(tableName);
Path hiveTableDataPath = dataDirectory.toPath().resolve(sourceSchemaName).resolve(tableName);
Path symlinkTableDataPath = dataDirectory.toPath().getParent().resolve(SYMLINK_FOLDER).resolve(tableName);

try {
Expand All @@ -194,11 +202,9 @@ public static void createExternalTable(QueryRunner queryRunner, String schemaNam
throw new PrestoException(() -> CREATE_ERROR_CODE, "Failed to create symlink manifest file for table: " + tableName, e);
}

if (!metastore.getDatabase(METASTORE_CONTEXT, schemaName).isPresent()) {
metastore.createDatabase(METASTORE_CONTEXT, createDatabaseMetastoreObject(schemaName));
}
if (!metastore.getTable(METASTORE_CONTEXT, schemaName, tableName).isPresent()) {
metastore.createTable(METASTORE_CONTEXT, createHiveSymlinkTable(schemaName, tableName, columns, symlinkTableDataPath.toString()), PRINCIPAL_PRIVILEGES, emptyList());
createSchemaIfNotExist(queryRunner, targetSchemaName);
if (!metastore.getTable(METASTORE_CONTEXT, targetSchemaName, tableName).isPresent()) {
metastore.createTable(METASTORE_CONTEXT, createHiveSymlinkTable(targetSchemaName, tableName, columns, symlinkTableDataPath.toString()), PRINCIPAL_PRIVILEGES, emptyList());
}
}

Expand Down

0 comments on commit 61c7b5f

Please sign in to comment.