Skip to content

Commit

Permalink
Add property to allow querying in nested namespaces
Browse files Browse the repository at this point in the history
The configuration property iceberg.rest.nested.namespace.enabled allows
nested namespace in Iceberg's REST Catalog
  • Loading branch information
denodo-research-labs authored and tdcmeehan committed Dec 3, 2024
1 parent b35c1e2 commit ae16c69
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 29 deletions.
5 changes: 5 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ Property Name Description
``iceberg.rest.auth.oauth2.credential``.
Example: ``PRINCIPAL_ROLE:ALL``

``iceberg.rest.nested.namespace.enabled`` In REST Catalogs, tables are grouped into namespaces, that can be
nested. But if a large number of recursive namespaces result in
lower performance, querying nested namespaces can be disabled.
Defaults to ``true``.

``iceberg.rest.session.type`` The session type to use when communicating with the REST catalog.
Available values are ``NONE`` or ``USER`` (default: ``NONE``).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ public SupportsNamespaces getNamespaces(ConnectorSession session)
throw new PrestoException(NOT_SUPPORTED, "Iceberg catalog of type " + catalogType + " does not support namespace operations");
}

public boolean isNestedNamespaceEnabled()
{
return false;
}

protected String getCacheKey(ConnectorSession session)
{
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergNamespace;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toPrestoSchemaName;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toPrestoSchemaTableName;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
import static com.google.common.base.Verify.verify;
Expand Down Expand Up @@ -139,7 +140,14 @@ protected boolean tableExists(ConnectorSession session, SchemaTableName schemaTa
public List<String> listSchemaNames(ConnectorSession session)
{
SupportsNamespaces supportsNamespaces = catalogFactory.getNamespaces(session);
return listNestedNamespaces(supportsNamespaces, Namespace.empty());
if (catalogFactory.isNestedNamespaceEnabled()) {
return listNestedNamespaces(supportsNamespaces, Namespace.empty());
}

return supportsNamespaces.listNamespaces()
.stream()
.map(IcebergPrestoModelConverters::toPrestoSchemaName)
.collect(toList());
}

private List<String> listNestedNamespaces(SupportsNamespaces supportsNamespaces, Namespace parentNamespace)
Expand All @@ -161,16 +169,16 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
.collect(toList());
}

return catalogFactory.getCatalog(session).listTables(toIcebergNamespace(schemaName))
return catalogFactory.getCatalog(session).listTables(toIcebergNamespace(schemaName, catalogFactory.isNestedNamespaceEnabled()))
.stream()
.map(IcebergPrestoModelConverters::toPrestoSchemaTableName)
.map(tableIdentifier -> toPrestoSchemaTableName(tableIdentifier, catalogFactory.isNestedNamespaceEnabled()))
.collect(toList());
}

@Override
public void createSchema(ConnectorSession session, String schemaName, Map<String, Object> properties)
{
catalogFactory.getNamespaces(session).createNamespace(toIcebergNamespace(Optional.of(schemaName)),
catalogFactory.getNamespaces(session).createNamespace(toIcebergNamespace(Optional.of(schemaName), catalogFactory.isNestedNamespaceEnabled()),
properties.entrySet().stream()
.collect(toMap(Map.Entry::getKey, e -> e.getValue().toString())));
}
Expand All @@ -179,7 +187,7 @@ public void createSchema(ConnectorSession session, String schemaName, Map<String
public void dropSchema(ConnectorSession session, String schemaName)
{
try {
catalogFactory.getNamespaces(session).dropNamespace(toIcebergNamespace(Optional.of(schemaName)));
catalogFactory.getNamespaces(session).dropNamespace(toIcebergNamespace(Optional.of(schemaName), catalogFactory.isNestedNamespaceEnabled()));
}
catch (NamespaceNotEmptyException e) {
throw new PrestoException(SCHEMA_NOT_EMPTY, "Schema not empty: " + schemaName);
Expand All @@ -200,9 +208,9 @@ public void createView(ConnectorSession session, ConnectorTableMetadata viewMeta
throw new PrestoException(NOT_SUPPORTED, "This connector does not support creating views");
}
Schema schema = toIcebergSchema(viewMetadata.getColumns());
ViewBuilder viewBuilder = ((ViewCatalog) catalog).buildView(toIcebergTableIdentifier(viewMetadata.getTable()))
ViewBuilder viewBuilder = ((ViewCatalog) catalog).buildView(toIcebergTableIdentifier(viewMetadata.getTable(), catalogFactory.isNestedNamespaceEnabled()))
.withSchema(schema)
.withDefaultNamespace(toIcebergNamespace(Optional.ofNullable(viewMetadata.getTable().getSchemaName())))
.withDefaultNamespace(toIcebergNamespace(Optional.ofNullable(viewMetadata.getTable().getSchemaName()), catalogFactory.isNestedNamespaceEnabled()))
.withQuery(VIEW_DIALECT, viewData)
.withProperties(createIcebergViewProperties(session, nodeVersion.toString()));
if (replace) {
Expand All @@ -221,7 +229,8 @@ public List<SchemaTableName> listViews(ConnectorSession session, Optional<String
if (catalog instanceof ViewCatalog) {
for (String schema : listSchemas(session, schemaName.orElse(null))) {
try {
for (TableIdentifier tableIdentifier : ((ViewCatalog) catalog).listViews(toIcebergNamespace(Optional.ofNullable(schema)))) {
for (TableIdentifier tableIdentifier : ((ViewCatalog) catalog).listViews(
toIcebergNamespace(Optional.ofNullable(schema), catalogFactory.isNestedNamespaceEnabled()))) {
tableNames.add(new SchemaTableName(schema, tableIdentifier.name()));
}
}
Expand Down Expand Up @@ -257,8 +266,9 @@ public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession s

for (SchemaTableName schemaTableName : tableNames) {
try {
if (((ViewCatalog) catalog).viewExists(toIcebergTableIdentifier(schemaTableName))) {
View view = ((ViewCatalog) catalog).loadView(toIcebergTableIdentifier(schemaTableName));
TableIdentifier viewIdentifier = toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled());
if (((ViewCatalog) catalog).viewExists(viewIdentifier)) {
View view = ((ViewCatalog) catalog).loadView(viewIdentifier);
verifyAndPopulateViews(view, schemaTableName, view.sqlFor(VIEW_DIALECT).sql(), views);
}
}
Expand All @@ -277,7 +287,7 @@ public void dropView(ConnectorSession session, SchemaTableName viewName)
if (!(catalog instanceof ViewCatalog)) {
throw new PrestoException(NOT_SUPPORTED, "This connector does not support dropping views");
}
((ViewCatalog) catalog).dropView(toIcebergTableIdentifier(viewName));
((ViewCatalog) catalog).dropView(toIcebergTableIdentifier(viewName, catalogFactory.isNestedNamespaceEnabled()));
}

private void verifyAndPopulateViews(View view, SchemaTableName schemaTableName, String viewData, ImmutableMap.Builder<SchemaTableName, ConnectorViewDefinition> views)
Expand All @@ -304,7 +314,10 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con

try {
transaction = catalogFactory.getCatalog(session).newCreateTableTransaction(
toIcebergTableIdentifier(schemaTableName), schema, partitionSpec, populateTableProperties(tableMetadata, fileFormat, session));
toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled()),
schema,
partitionSpec,
populateTableProperties(tableMetadata, fileFormat, session));
}
catch (AlreadyExistsException e) {
throw new TableAlreadyExistsException(schemaTableName);
Expand All @@ -328,7 +341,7 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
verify(icebergTableHandle.getIcebergTableName().getTableType() == DATA, "only the data table can be dropped");
TableIdentifier tableIdentifier = toIcebergTableIdentifier(icebergTableHandle.getSchemaTableName());
TableIdentifier tableIdentifier = toIcebergTableIdentifier(icebergTableHandle.getSchemaTableName(), catalogFactory.isNestedNamespaceEnabled());
catalogFactory.getCatalog(session).dropTable(tableIdentifier);
}

Expand All @@ -337,20 +350,20 @@ public void renameTable(ConnectorSession session, ConnectorTableHandle tableHand
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
verify(icebergTableHandle.getIcebergTableName().getTableType() == DATA, "only the data table can be renamed");
TableIdentifier from = toIcebergTableIdentifier(icebergTableHandle.getSchemaTableName());
TableIdentifier to = toIcebergTableIdentifier(newTable);
TableIdentifier from = toIcebergTableIdentifier(icebergTableHandle.getSchemaTableName(), catalogFactory.isNestedNamespaceEnabled());
TableIdentifier to = toIcebergTableIdentifier(newTable, catalogFactory.isNestedNamespaceEnabled());
catalogFactory.getCatalog(session).renameTable(from, to);
}

@Override
public void registerTable(ConnectorSession clientSession, SchemaTableName schemaTableName, Path metadataLocation)
{
catalogFactory.getCatalog(clientSession).registerTable(toIcebergTableIdentifier(schemaTableName), metadataLocation.toString());
catalogFactory.getCatalog(clientSession).registerTable(toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled()), metadataLocation.toString());
}

@Override
public void unregisterTable(ConnectorSession clientSession, SchemaTableName schemaTableName)
{
catalogFactory.getCatalog(clientSession).dropTable(toIcebergTableIdentifier(schemaTableName), false);
catalogFactory.getCatalog(clientSession).dropTable(toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled()), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public static Table getHiveIcebergTable(ExtendedHiveMetastore metastore, HdfsEnv

public static Table getNativeIcebergTable(IcebergNativeCatalogFactory catalogFactory, ConnectorSession session, SchemaTableName table)
{
return catalogFactory.getCatalog(session).loadTable(toIcebergTableIdentifier(table));
return catalogFactory.getCatalog(session).loadTable(toIcebergTableIdentifier(table, catalogFactory.isNestedNamespaceEnabled()));
}

public static View getNativeIcebergView(IcebergNativeCatalogFactory catalogFactory, ConnectorSession session, SchemaTableName table)
Expand All @@ -263,7 +263,7 @@ public static View getNativeIcebergView(IcebergNativeCatalogFactory catalogFacto
if (!(catalog instanceof ViewCatalog)) {
throw new PrestoException(NOT_SUPPORTED, "This connector does not support get views");
}
return ((ViewCatalog) catalog).loadView(toIcebergTableIdentifier(table));
return ((ViewCatalog) catalog).loadView(toIcebergTableIdentifier(table, catalogFactory.isNestedNamespaceEnabled()));
}

public static List<IcebergColumnHandle> getPartitionKeyColumnHandles(IcebergTableHandle tableHandle, Table table, TypeManager typeManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class IcebergRestCatalogFactory
private final IcebergRestConfig catalogConfig;
private final NodeVersion nodeVersion;
private final String catalogName;
private final boolean nestedNamespaceEnabled;

@Inject
public IcebergRestCatalogFactory(
Expand All @@ -73,6 +74,7 @@ public IcebergRestCatalogFactory(
this.catalogConfig = requireNonNull(catalogConfig, "catalogConfig is null");
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
this.catalogName = requireNonNull(catalogName, "catalogName is null").getCatalogName();
this.nestedNamespaceEnabled = catalogConfig.isNestedNamespaceEnabled();
}

@Override
Expand Down Expand Up @@ -135,6 +137,12 @@ protected Map<String, String> getCatalogProperties(ConnectorSession session)
return properties.build();
}

@Override
public boolean isNestedNamespaceEnabled()
{
return this.nestedNamespaceEnabled;
}

protected SessionContext convertSession(ConnectorSession session)
{
RestSessionBuilder sessionContextBuilder = catalogConfig.getSessionType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class IcebergRestConfig
private String credential;
private String token;
private String scope;
private boolean nestedNamespaceEnabled = true;

@NotNull
public Optional<String> getServerUri()
Expand Down Expand Up @@ -122,6 +123,19 @@ public IcebergRestConfig setScope(String scope)
return this;
}

public boolean isNestedNamespaceEnabled()
{
return nestedNamespaceEnabled;
}

@Config("iceberg.rest.nested.namespace.enabled")
@ConfigDescription("Allows querying nested namespaces. Default: true")
public IcebergRestConfig setNestedNamespaceEnabled(boolean nestedNamespaceEnabled)
{
this.nestedNamespaceEnabled = nestedNamespaceEnabled;
return this;
}

public boolean credentialOrTokenExists()
{
return credential != null || token != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
*/
package com.facebook.presto.iceberg.util;

import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.google.common.base.Splitter;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;

import java.util.Optional;

import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;

public class IcebergPrestoModelConverters
{
private static final String NAMESPACE_SEPARATOR = ".";
Expand All @@ -34,26 +38,38 @@ public static String toPrestoSchemaName(Namespace icebergNamespace)
return icebergNamespace.toString();
}

public static Namespace toIcebergNamespace(Optional<String> prestoSchemaName)
public static Namespace toIcebergNamespace(Optional<String> prestoSchemaName, boolean nestedNamespaceEnabled)
{
if (prestoSchemaName.isPresent()) {
checkNestedNamespaceSupport(prestoSchemaName.get(), nestedNamespaceEnabled);
return Namespace.of(NAMESPACE_SPLITTER.splitToList(prestoSchemaName.get()).toArray(new String[0]));
}
return Namespace.empty();
}

public static SchemaTableName toPrestoSchemaTableName(TableIdentifier icebergTableIdentifier)
public static SchemaTableName toPrestoSchemaTableName(TableIdentifier icebergTableIdentifier, boolean nestedNamespaceEnabled)
{
return new SchemaTableName(icebergTableIdentifier.namespace().toString(), icebergTableIdentifier.name());
String schemaName = icebergTableIdentifier.namespace().toString();
checkNestedNamespaceSupport(schemaName, nestedNamespaceEnabled);
return new SchemaTableName(schemaName, icebergTableIdentifier.name());
}

public static TableIdentifier toIcebergTableIdentifier(SchemaTableName prestoSchemaTableName)
public static TableIdentifier toIcebergTableIdentifier(SchemaTableName prestoSchemaTableName, boolean nestedNamespaceEnabled)
{
return toIcebergTableIdentifier(toIcebergNamespace(Optional.ofNullable(prestoSchemaTableName.getSchemaName())), prestoSchemaTableName.getTableName());
return toIcebergTableIdentifier(toIcebergNamespace(Optional.ofNullable(prestoSchemaTableName.getSchemaName()), nestedNamespaceEnabled),
prestoSchemaTableName.getTableName(), nestedNamespaceEnabled);
}

public static TableIdentifier toIcebergTableIdentifier(Namespace icebergNamespace, String prestoTableName)
public static TableIdentifier toIcebergTableIdentifier(Namespace icebergNamespace, String prestoTableName, boolean nestedNamespaceEnabled)
{
checkNestedNamespaceSupport(icebergNamespace.toString(), nestedNamespaceEnabled);
return TableIdentifier.of(icebergNamespace, prestoTableName);
}

private static void checkNestedNamespaceSupport(String schemaName, boolean nestedNamespaceEnabled)
{
if (!nestedNamespaceEnabled && schemaName.contains(NAMESPACE_SEPARATOR)) {
throw new PrestoException(NOT_SUPPORTED, format("Nested namespaces are disabled. Schema %s is not valid", schemaName));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public void testDefaults()
.setCredential(null)
.setToken(null)
.setScope(null)
.setSessionType(null));
.setSessionType(null)
.setNestedNamespaceEnabled(true));
}

@Test
Expand All @@ -50,6 +51,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.rest.auth.oauth2.token", "SXVLUXUhIExFQ0tFUiEK")
.put("iceberg.rest.auth.oauth2.scope", "PRINCIPAL_ROLE:ALL")
.put("iceberg.rest.session.type", "USER")
.put("iceberg.rest.nested.namespace.enabled", "false")
.build();

IcebergRestConfig expected = new IcebergRestConfig()
Expand All @@ -59,7 +61,8 @@ public void testExplicitPropertyMappings()
.setCredential("key:secret")
.setToken("SXVLUXUhIExFQ0tFUiEK")
.setScope("PRINCIPAL_ROLE:ALL")
.setSessionType(USER);
.setSessionType(USER)
.setNestedNamespaceEnabled(false);

assertFullMapping(properties, expected);
}
Expand Down
Loading

0 comments on commit ae16c69

Please sign in to comment.