Skip to content

Commit

Permalink
Support nested namespaces in Iceberg's REST Catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
denodo-research-labs authored and tdcmeehan committed Dec 3, 2024
1 parent 68b26f3 commit b35c1e2
Show file tree
Hide file tree
Showing 5 changed files with 404 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Stream;

import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat;
Expand All @@ -71,6 +72,7 @@
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergNamespace;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toPrestoSchemaName;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
import static com.google.common.base.Verify.verify;
Expand Down Expand Up @@ -137,9 +139,16 @@ protected boolean tableExists(ConnectorSession session, SchemaTableName schemaTa
public List<String> listSchemaNames(ConnectorSession session)
{
SupportsNamespaces supportsNamespaces = catalogFactory.getNamespaces(session);
return supportsNamespaces.listNamespaces()
return listNestedNamespaces(supportsNamespaces, Namespace.empty());
}

private List<String> listNestedNamespaces(SupportsNamespaces supportsNamespaces, Namespace parentNamespace)
{
return supportsNamespaces.listNamespaces(parentNamespace)
.stream()
.map(IcebergPrestoModelConverters::toPrestoSchemaName)
.flatMap(childNamespace -> Stream.concat(
Stream.of(toPrestoSchemaName(childNamespace)),
listNestedNamespaces(supportsNamespaces, childNamespace).stream()))
.collect(toList());
}

Expand Down Expand Up @@ -191,9 +200,9 @@ public void createView(ConnectorSession session, ConnectorTableMetadata viewMeta
throw new PrestoException(NOT_SUPPORTED, "This connector does not support creating views");
}
Schema schema = toIcebergSchema(viewMetadata.getColumns());
ViewBuilder viewBuilder = ((ViewCatalog) catalog).buildView(TableIdentifier.of(viewMetadata.getTable().getSchemaName(), viewMetadata.getTable().getTableName()))
ViewBuilder viewBuilder = ((ViewCatalog) catalog).buildView(toIcebergTableIdentifier(viewMetadata.getTable()))
.withSchema(schema)
.withDefaultNamespace(Namespace.of(viewMetadata.getTable().getSchemaName()))
.withDefaultNamespace(toIcebergNamespace(Optional.ofNullable(viewMetadata.getTable().getSchemaName())))
.withQuery(VIEW_DIALECT, viewData)
.withProperties(createIcebergViewProperties(session, nodeVersion.toString()));
if (replace) {
Expand All @@ -212,7 +221,7 @@ public List<SchemaTableName> listViews(ConnectorSession session, Optional<String
if (catalog instanceof ViewCatalog) {
for (String schema : listSchemas(session, schemaName.orElse(null))) {
try {
for (TableIdentifier tableIdentifier : ((ViewCatalog) catalog).listViews(Namespace.of(schema))) {
for (TableIdentifier tableIdentifier : ((ViewCatalog) catalog).listViews(toIcebergNamespace(Optional.ofNullable(schema)))) {
tableNames.add(new SchemaTableName(schema, tableIdentifier.name()));
}
}
Expand Down Expand Up @@ -248,8 +257,8 @@ public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession s

for (SchemaTableName schemaTableName : tableNames) {
try {
if (((ViewCatalog) catalog).viewExists(TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName()))) {
View view = ((ViewCatalog) catalog).loadView(TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName()));
if (((ViewCatalog) catalog).viewExists(toIcebergTableIdentifier(schemaTableName))) {
View view = ((ViewCatalog) catalog).loadView(toIcebergTableIdentifier(schemaTableName));
verifyAndPopulateViews(view, schemaTableName, view.sqlFor(VIEW_DIALECT).sql(), views);
}
}
Expand All @@ -268,7 +277,7 @@ public void dropView(ConnectorSession session, SchemaTableName viewName)
if (!(catalog instanceof ViewCatalog)) {
throw new PrestoException(NOT_SUPPORTED, "This connector does not support dropping views");
}
((ViewCatalog) catalog).dropView(TableIdentifier.of(viewName.getSchemaName(), viewName.getTableName()));
((ViewCatalog) catalog).dropView(toIcebergTableIdentifier(viewName));
}

private void verifyAndPopulateViews(View view, SchemaTableName schemaTableName, String viewData, ImmutableMap.Builder<SchemaTableName, ConnectorViewDefinition> views)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
package com.facebook.presto.iceberg.util;

import com.facebook.presto.spi.SchemaTableName;
import com.google.common.base.Splitter;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;

import java.util.Optional;

public class IcebergPrestoModelConverters
{
private static final String NAMESPACE_SEPARATOR = ".";
private static final Splitter NAMESPACE_SPLITTER = Splitter.on(NAMESPACE_SEPARATOR).omitEmptyStrings().trimResults();

private IcebergPrestoModelConverters()
{
}
Expand All @@ -32,7 +36,10 @@ public static String toPrestoSchemaName(Namespace icebergNamespace)

public static Namespace toIcebergNamespace(Optional<String> prestoSchemaName)
{
return prestoSchemaName.map(Namespace::of).orElse(Namespace.empty());
if (prestoSchemaName.isPresent()) {
return Namespace.of(NAMESPACE_SPLITTER.splitToList(prestoSchemaName.get()).toArray(new String[0]));
}
return Namespace.empty();
}

public static SchemaTableName toPrestoSchemaTableName(TableIdentifier icebergTableIdentifier)
Expand All @@ -42,11 +49,11 @@ public static SchemaTableName toPrestoSchemaTableName(TableIdentifier icebergTab

public static TableIdentifier toIcebergTableIdentifier(SchemaTableName prestoSchemaTableName)
{
return toIcebergTableIdentifier(prestoSchemaTableName.getSchemaName(), prestoSchemaTableName.getTableName());
return toIcebergTableIdentifier(toIcebergNamespace(Optional.ofNullable(prestoSchemaTableName.getSchemaName())), prestoSchemaTableName.getTableName());
}

public static TableIdentifier toIcebergTableIdentifier(String prestoSchemaName, String prestoTableName)
public static TableIdentifier toIcebergTableIdentifier(Namespace icebergNamespace, String prestoTableName)
{
return TableIdentifier.of(prestoSchemaName, prestoTableName);
return TableIdentifier.of(icebergNamespace, prestoTableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ public void testDescribeTable()
@Test
public void testShowCreateTable()
{
String schemaName = getSession().getSchema().get();
assertThat(computeActual("SHOW CREATE TABLE orders").getOnlyValue())
.isEqualTo(format("CREATE TABLE iceberg.tpch.orders (\n" +
.isEqualTo(format("CREATE TABLE iceberg.%s.orders (\n" +
" \"orderkey\" bigint,\n" +
" \"custkey\" bigint,\n" +
" \"orderstatus\" varchar,\n" +
Expand All @@ -145,7 +146,7 @@ public void testShowCreateTable()
" metadata_delete_after_commit = false,\n" +
" metadata_previous_versions_max = 100,\n" +
" metrics_max_inferred_column = 100\n" +
")", getLocation("tpch", "orders")));
")", schemaName, getLocation(schemaName, "orders")));
}

@Test
Expand Down Expand Up @@ -397,7 +398,7 @@ public void testCreatePartitionedTableAs()
testWithAllFileFormats(this::testCreatePartitionedTableAs);
}

private void testCreatePartitionedTableAs(Session session, FileFormat fileFormat)
protected void testCreatePartitionedTableAs(Session session, FileFormat fileFormat)
{
@Language("SQL") String createTable = "" +
"CREATE TABLE test_create_partitioned_table_as_" + fileFormat.toString().toLowerCase(ENGLISH) + " " +
Expand Down Expand Up @@ -602,9 +603,10 @@ public void testColumnComments()
public void testTableComments()
{
Session session = getSession();
String schemaName = session.getSchema().get();

@Language("SQL") String createTable = "" +
"CREATE TABLE iceberg.tpch.test_table_comments (\n" +
"CREATE TABLE iceberg.%s.test_table_comments (\n" +
" \"_x\" bigint\n" +
")\n" +
"COMMENT '%s'\n" +
Expand All @@ -613,10 +615,10 @@ public void testTableComments()
" format_version = '2'\n" +
")";

assertUpdate(format(createTable, "test table comment"));
assertUpdate(format(createTable, schemaName, "test table comment"));

String createTableTemplate = "" +
"CREATE TABLE iceberg.tpch.test_table_comments (\n" +
"CREATE TABLE iceberg.%s.test_table_comments (\n" +
" \"_x\" bigint\n" +
")\n" +
"COMMENT '%s'\n" +
Expand All @@ -629,7 +631,7 @@ public void testTableComments()
" metadata_previous_versions_max = 100,\n" +
" metrics_max_inferred_column = 100\n" +
")";
String createTableSql = format(createTableTemplate, "test table comment", getLocation("tpch", "test_table_comments"));
String createTableSql = format(createTableTemplate, schemaName, "test table comment", getLocation(schemaName, "test_table_comments"));

MaterializedResult resultOfCreate = computeActual("SHOW CREATE TABLE test_table_comments");
assertEquals(getOnlyElement(resultOfCreate.getOnlyColumnAsSet()), createTableSql);
Expand All @@ -651,11 +653,11 @@ public void testRollbackSnapshot()
assertQuery(session, "SELECT * FROM test_rollback ORDER BY col0",
"VALUES (123, CAST(987 AS BIGINT)), (456, CAST(654 AS BIGINT)), (123, CAST(321 AS BIGINT))");

assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterFirstInsertId));
assertUpdate(format("CALL system.rollback_to_snapshot('%s', 'test_rollback', %s)", session.getSchema().get(), afterFirstInsertId));
assertQuery(session, "SELECT * FROM test_rollback ORDER BY col0",
"VALUES (123, CAST(987 AS BIGINT)), (123, CAST(321 AS BIGINT))");

assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterCreateTableId));
assertUpdate(format("CALL system.rollback_to_snapshot('%s', 'test_rollback', %s)", session.getSchema().get(), afterCreateTableId));
assertEquals((long) computeActual(session, "SELECT COUNT(*) FROM test_rollback").getOnlyValue(), 1);

dropTable(session, "test_rollback");
Expand Down Expand Up @@ -708,6 +710,7 @@ private void testSchemaEvolution(Session session, FileFormat fileFormat)
private void testCreateTableLike()
{
Session session = getSession();
String schemaName = session.getSchema().get();

assertUpdate(session, "CREATE TABLE test_create_table_like_original (col1 INTEGER, aDate DATE) WITH(format = 'PARQUET', partitioning = ARRAY['aDate'])");
assertEquals(getTablePropertiesString("test_create_table_like_original"), format("WITH (\n" +
Expand All @@ -719,7 +722,7 @@ private void testCreateTableLike()
" metadata_previous_versions_max = 100,\n" +
" metrics_max_inferred_column = 100,\n" +
" partitioning = ARRAY['adate']\n" +
")", getLocation("tpch", "test_create_table_like_original")));
")", getLocation(schemaName, "test_create_table_like_original")));

assertUpdate(session, "CREATE TABLE test_create_table_like_copy0 (LIKE test_create_table_like_original, col2 INTEGER)");
assertUpdate(session, "INSERT INTO test_create_table_like_copy0 (col1, aDate, col2) VALUES (1, CAST('1950-06-28' AS DATE), 3)", 1);
Expand All @@ -735,7 +738,7 @@ private void testCreateTableLike()
" metadata_delete_after_commit = false,\n" +
" metadata_previous_versions_max = 100,\n" +
" metrics_max_inferred_column = 100\n" +
")", getLocation("tpch", "test_create_table_like_copy1")));
")", getLocation(schemaName, "test_create_table_like_copy1")));
dropTable(session, "test_create_table_like_copy1");

assertUpdate(session, "CREATE TABLE test_create_table_like_copy2 (LIKE test_create_table_like_original EXCLUDING PROPERTIES)");
Expand All @@ -747,7 +750,7 @@ private void testCreateTableLike()
" metadata_delete_after_commit = false,\n" +
" metadata_previous_versions_max = 100,\n" +
" metrics_max_inferred_column = 100\n" +
")", getLocation("tpch", "test_create_table_like_copy2")));
")", getLocation(schemaName, "test_create_table_like_copy2")));
dropTable(session, "test_create_table_like_copy2");

assertUpdate(session, "CREATE TABLE test_create_table_like_copy3 (LIKE test_create_table_like_original INCLUDING PROPERTIES)");
Expand All @@ -761,8 +764,8 @@ private void testCreateTableLike()
" metrics_max_inferred_column = 100,\n" +
" partitioning = ARRAY['adate']\n" +
")", catalogType.equals(CatalogType.HIVE) ?
getLocation("tpch", "test_create_table_like_original") :
getLocation("tpch", "test_create_table_like_copy3")));
getLocation(schemaName, "test_create_table_like_original") :
getLocation(schemaName, "test_create_table_like_copy3")));
dropTable(session, "test_create_table_like_copy3");

assertUpdate(session, "CREATE TABLE test_create_table_like_copy4 (LIKE test_create_table_like_original INCLUDING PROPERTIES) WITH (format = 'ORC')");
Expand All @@ -776,8 +779,8 @@ private void testCreateTableLike()
" metrics_max_inferred_column = 100,\n" +
" partitioning = ARRAY['adate']\n" +
")", catalogType.equals(CatalogType.HIVE) ?
getLocation("tpch", "test_create_table_like_original") :
getLocation("tpch", "test_create_table_like_copy4")));
getLocation(schemaName, "test_create_table_like_original") :
getLocation(schemaName, "test_create_table_like_copy4")));
dropTable(session, "test_create_table_like_copy4");

dropTable(session, "test_create_table_like_original");
Expand Down Expand Up @@ -1168,11 +1171,12 @@ protected void cleanupTableWithMergeOnRead(String tableName)
@Test
public void testMergeOnReadEnabled()
{
String schemaName = getSession().getSchema().get();
String tableName = "test_merge_on_read_enabled";
try {
Session session = getSession();

createTableWithMergeOnRead(session, "tpch", tableName);
createTableWithMergeOnRead(session, schemaName, tableName);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES (1, 1)", 1);
assertUpdate(session, "INSERT INTO " + tableName + " VALUES (2, 2)", 1);
assertQuery(session, "SELECT * FROM " + tableName, "VALUES (1, 1), (2, 2)");
Expand All @@ -1192,7 +1196,7 @@ public void testMergeOnReadDisabled()
.setCatalogSessionProperty(ICEBERG_CATALOG, "merge_on_read_enabled", "false")
.build();

createTableWithMergeOnRead(session, "tpch", tableName);
createTableWithMergeOnRead(session, session.getSchema().get(), tableName);
assertQueryFails(session, "INSERT INTO " + tableName + " VALUES (1, 1)", errorMessage);
assertQueryFails(session, "INSERT INTO " + tableName + " VALUES (2, 2)", errorMessage);
assertQueryFails(session, "SELECT * FROM " + tableName, errorMessage);
Expand Down Expand Up @@ -1849,6 +1853,7 @@ public void testUpdatingCommitRetries()
public void testUpdateNonExistentTable()
{
assertQuerySucceeds("ALTER TABLE IF EXISTS non_existent_test_table1 SET PROPERTIES (commit_retries = 6)");
assertQueryFails("ALTER TABLE non_existent_test_table2 SET PROPERTIES (commit_retries = 6)", "Table does not exist: iceberg.tpch.non_existent_test_table2");
assertQueryFails("ALTER TABLE non_existent_test_table2 SET PROPERTIES (commit_retries = 6)",
format("Table does not exist: iceberg.%s.non_existent_test_table2", getSession().getSchema().get()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public static DistributedQueryRunner createIcebergQueryRunner(
Optional<Path> dataDirectory)
throws Exception
{
return createIcebergQueryRunner(extraProperties, extraConnectorProperties, format, createTpchTables, addJmxPlugin, nodeCount, externalWorkerLauncher, dataDirectory, false);
return createIcebergQueryRunner(extraProperties, extraConnectorProperties, format, createTpchTables, addJmxPlugin, nodeCount, externalWorkerLauncher, dataDirectory, false, Optional.empty());
}

public static DistributedQueryRunner createIcebergQueryRunner(
Expand All @@ -160,12 +160,28 @@ public static DistributedQueryRunner createIcebergQueryRunner(
Optional<Path> dataDirectory,
boolean addStorageFormatToPath)
throws Exception
{
return createIcebergQueryRunner(extraProperties, extraConnectorProperties, format, createTpchTables, addJmxPlugin, nodeCount, externalWorkerLauncher, dataDirectory, addStorageFormatToPath, Optional.empty());
}

public static DistributedQueryRunner createIcebergQueryRunner(
Map<String, String> extraProperties,
Map<String, String> extraConnectorProperties,
FileFormat format,
boolean createTpchTables,
boolean addJmxPlugin,
OptionalInt nodeCount,
Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher,
Optional<Path> dataDirectory,
boolean addStorageFormatToPath,
Optional<String> schemaName)
throws Exception
{
setupLogging();

Session session = testSessionBuilder()
.setCatalog(ICEBERG_CATALOG)
.setSchema("tpch")
.setSchema(schemaName.orElse("tpch"))
.build();

DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session)
Expand Down
Loading

0 comments on commit b35c1e2

Please sign in to comment.