Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support nested namespaces in Iceberg's REST Catalog #24083

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ Property Name Description
``iceberg.rest.auth.oauth2.credential``.
Example: ``PRINCIPAL_ROLE:ALL``

``iceberg.rest.nested.namespace.enabled`` In REST Catalogs, tables are grouped into namespaces, that can be
nested. But if a large number of recursive namespaces result in
lower performance, querying nested namespaces can be disabled.
Defaults to ``true``.

``iceberg.rest.session.type`` The session type to use when communicating with the REST catalog.
Available values are ``NONE`` or ``USER`` (default: ``NONE``).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ public SupportsNamespaces getNamespaces(ConnectorSession session)
throw new PrestoException(NOT_SUPPORTED, "Iceberg catalog of type " + catalogType + " does not support namespace operations");
}

public boolean isNestedNamespaceEnabled()
{
return false;
}

protected String getCacheKey(ConnectorSession session)
{
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Stream;

import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat;
Expand All @@ -71,6 +72,8 @@
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergNamespace;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toPrestoSchemaName;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toPrestoSchemaTableName;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
import static com.google.common.base.Verify.verify;
Expand Down Expand Up @@ -137,12 +140,26 @@ protected boolean tableExists(ConnectorSession session, SchemaTableName schemaTa
public List<String> listSchemaNames(ConnectorSession session)
{
SupportsNamespaces supportsNamespaces = catalogFactory.getNamespaces(session);
if (catalogFactory.isNestedNamespaceEnabled()) {
return listNestedNamespaces(supportsNamespaces, Namespace.empty());
}

return supportsNamespaces.listNamespaces()
.stream()
.map(IcebergPrestoModelConverters::toPrestoSchemaName)
.collect(toList());
}

private List<String> listNestedNamespaces(SupportsNamespaces supportsNamespaces, Namespace parentNamespace)
{
return supportsNamespaces.listNamespaces(parentNamespace)
.stream()
.flatMap(childNamespace -> Stream.concat(
Stream.of(toPrestoSchemaName(childNamespace)),
listNestedNamespaces(supportsNamespaces, childNamespace).stream()))
.collect(toList());
}

@Override
public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaName)
{
Expand All @@ -152,16 +169,16 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
.collect(toList());
}

return catalogFactory.getCatalog(session).listTables(toIcebergNamespace(schemaName))
return catalogFactory.getCatalog(session).listTables(toIcebergNamespace(schemaName, catalogFactory.isNestedNamespaceEnabled()))
.stream()
.map(IcebergPrestoModelConverters::toPrestoSchemaTableName)
.map(tableIdentifier -> toPrestoSchemaTableName(tableIdentifier, catalogFactory.isNestedNamespaceEnabled()))
.collect(toList());
}

@Override
public void createSchema(ConnectorSession session, String schemaName, Map<String, Object> properties)
{
catalogFactory.getNamespaces(session).createNamespace(toIcebergNamespace(Optional.of(schemaName)),
catalogFactory.getNamespaces(session).createNamespace(toIcebergNamespace(Optional.of(schemaName), catalogFactory.isNestedNamespaceEnabled()),
properties.entrySet().stream()
.collect(toMap(Map.Entry::getKey, e -> e.getValue().toString())));
}
Expand All @@ -170,7 +187,7 @@ public void createSchema(ConnectorSession session, String schemaName, Map<String
public void dropSchema(ConnectorSession session, String schemaName)
{
try {
catalogFactory.getNamespaces(session).dropNamespace(toIcebergNamespace(Optional.of(schemaName)));
catalogFactory.getNamespaces(session).dropNamespace(toIcebergNamespace(Optional.of(schemaName), catalogFactory.isNestedNamespaceEnabled()));
}
catch (NamespaceNotEmptyException e) {
throw new PrestoException(SCHEMA_NOT_EMPTY, "Schema not empty: " + schemaName);
Expand All @@ -191,9 +208,9 @@ public void createView(ConnectorSession session, ConnectorTableMetadata viewMeta
throw new PrestoException(NOT_SUPPORTED, "This connector does not support creating views");
}
Schema schema = toIcebergSchema(viewMetadata.getColumns());
ViewBuilder viewBuilder = ((ViewCatalog) catalog).buildView(TableIdentifier.of(viewMetadata.getTable().getSchemaName(), viewMetadata.getTable().getTableName()))
ViewBuilder viewBuilder = ((ViewCatalog) catalog).buildView(toIcebergTableIdentifier(viewMetadata.getTable(), catalogFactory.isNestedNamespaceEnabled()))
.withSchema(schema)
.withDefaultNamespace(Namespace.of(viewMetadata.getTable().getSchemaName()))
.withDefaultNamespace(toIcebergNamespace(Optional.ofNullable(viewMetadata.getTable().getSchemaName()), catalogFactory.isNestedNamespaceEnabled()))
.withQuery(VIEW_DIALECT, viewData)
.withProperties(createIcebergViewProperties(session, nodeVersion.toString()));
if (replace) {
Expand All @@ -212,7 +229,8 @@ public List<SchemaTableName> listViews(ConnectorSession session, Optional<String
if (catalog instanceof ViewCatalog) {
for (String schema : listSchemas(session, schemaName.orElse(null))) {
try {
for (TableIdentifier tableIdentifier : ((ViewCatalog) catalog).listViews(Namespace.of(schema))) {
for (TableIdentifier tableIdentifier : ((ViewCatalog) catalog).listViews(
toIcebergNamespace(Optional.ofNullable(schema), catalogFactory.isNestedNamespaceEnabled()))) {
tableNames.add(new SchemaTableName(schema, tableIdentifier.name()));
}
}
Expand Down Expand Up @@ -248,8 +266,9 @@ public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession s

for (SchemaTableName schemaTableName : tableNames) {
try {
if (((ViewCatalog) catalog).viewExists(TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName()))) {
View view = ((ViewCatalog) catalog).loadView(TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName()));
TableIdentifier viewIdentifier = toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled());
if (((ViewCatalog) catalog).viewExists(viewIdentifier)) {
View view = ((ViewCatalog) catalog).loadView(viewIdentifier);
verifyAndPopulateViews(view, schemaTableName, view.sqlFor(VIEW_DIALECT).sql(), views);
}
}
Expand All @@ -268,7 +287,7 @@ public void dropView(ConnectorSession session, SchemaTableName viewName)
if (!(catalog instanceof ViewCatalog)) {
throw new PrestoException(NOT_SUPPORTED, "This connector does not support dropping views");
}
((ViewCatalog) catalog).dropView(TableIdentifier.of(viewName.getSchemaName(), viewName.getTableName()));
((ViewCatalog) catalog).dropView(toIcebergTableIdentifier(viewName, catalogFactory.isNestedNamespaceEnabled()));
}

private void verifyAndPopulateViews(View view, SchemaTableName schemaTableName, String viewData, ImmutableMap.Builder<SchemaTableName, ConnectorViewDefinition> views)
Expand All @@ -295,7 +314,10 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con

try {
transaction = catalogFactory.getCatalog(session).newCreateTableTransaction(
toIcebergTableIdentifier(schemaTableName), schema, partitionSpec, populateTableProperties(tableMetadata, fileFormat, session));
toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled()),
schema,
partitionSpec,
populateTableProperties(tableMetadata, fileFormat, session));
}
catch (AlreadyExistsException e) {
throw new TableAlreadyExistsException(schemaTableName);
Expand All @@ -319,7 +341,7 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
verify(icebergTableHandle.getIcebergTableName().getTableType() == DATA, "only the data table can be dropped");
TableIdentifier tableIdentifier = toIcebergTableIdentifier(icebergTableHandle.getSchemaTableName());
TableIdentifier tableIdentifier = toIcebergTableIdentifier(icebergTableHandle.getSchemaTableName(), catalogFactory.isNestedNamespaceEnabled());
catalogFactory.getCatalog(session).dropTable(tableIdentifier);
}

Expand All @@ -328,20 +350,20 @@ public void renameTable(ConnectorSession session, ConnectorTableHandle tableHand
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
verify(icebergTableHandle.getIcebergTableName().getTableType() == DATA, "only the data table can be renamed");
TableIdentifier from = toIcebergTableIdentifier(icebergTableHandle.getSchemaTableName());
TableIdentifier to = toIcebergTableIdentifier(newTable);
TableIdentifier from = toIcebergTableIdentifier(icebergTableHandle.getSchemaTableName(), catalogFactory.isNestedNamespaceEnabled());
TableIdentifier to = toIcebergTableIdentifier(newTable, catalogFactory.isNestedNamespaceEnabled());
catalogFactory.getCatalog(session).renameTable(from, to);
}

@Override
public void registerTable(ConnectorSession clientSession, SchemaTableName schemaTableName, Path metadataLocation)
{
catalogFactory.getCatalog(clientSession).registerTable(toIcebergTableIdentifier(schemaTableName), metadataLocation.toString());
catalogFactory.getCatalog(clientSession).registerTable(toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled()), metadataLocation.toString());
}

@Override
public void unregisterTable(ConnectorSession clientSession, SchemaTableName schemaTableName)
{
catalogFactory.getCatalog(clientSession).dropTable(toIcebergTableIdentifier(schemaTableName), false);
catalogFactory.getCatalog(clientSession).dropTable(toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled()), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public static Table getHiveIcebergTable(ExtendedHiveMetastore metastore, HdfsEnv

public static Table getNativeIcebergTable(IcebergNativeCatalogFactory catalogFactory, ConnectorSession session, SchemaTableName table)
{
return catalogFactory.getCatalog(session).loadTable(toIcebergTableIdentifier(table));
return catalogFactory.getCatalog(session).loadTable(toIcebergTableIdentifier(table, catalogFactory.isNestedNamespaceEnabled()));
}

public static View getNativeIcebergView(IcebergNativeCatalogFactory catalogFactory, ConnectorSession session, SchemaTableName table)
Expand All @@ -263,7 +263,7 @@ public static View getNativeIcebergView(IcebergNativeCatalogFactory catalogFacto
if (!(catalog instanceof ViewCatalog)) {
throw new PrestoException(NOT_SUPPORTED, "This connector does not support get views");
}
return ((ViewCatalog) catalog).loadView(toIcebergTableIdentifier(table));
return ((ViewCatalog) catalog).loadView(toIcebergTableIdentifier(table, catalogFactory.isNestedNamespaceEnabled()));
}

public static List<IcebergColumnHandle> getPartitionKeyColumnHandles(IcebergTableHandle tableHandle, Table table, TypeManager typeManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class IcebergRestCatalogFactory
private final IcebergRestConfig catalogConfig;
private final NodeVersion nodeVersion;
private final String catalogName;
private final boolean nestedNamespaceEnabled;

@Inject
public IcebergRestCatalogFactory(
Expand All @@ -73,6 +74,7 @@ public IcebergRestCatalogFactory(
this.catalogConfig = requireNonNull(catalogConfig, "catalogConfig is null");
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
this.catalogName = requireNonNull(catalogName, "catalogName is null").getCatalogName();
this.nestedNamespaceEnabled = catalogConfig.isNestedNamespaceEnabled();
}

@Override
Expand Down Expand Up @@ -135,6 +137,12 @@ protected Map<String, String> getCatalogProperties(ConnectorSession session)
return properties.build();
}

@Override
public boolean isNestedNamespaceEnabled()
{
return this.nestedNamespaceEnabled;
}

protected SessionContext convertSession(ConnectorSession session)
{
RestSessionBuilder sessionContextBuilder = catalogConfig.getSessionType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class IcebergRestConfig
private String credential;
private String token;
private String scope;
private boolean nestedNamespaceEnabled = true;

@NotNull
public Optional<String> getServerUri()
Expand Down Expand Up @@ -122,6 +123,19 @@ public IcebergRestConfig setScope(String scope)
return this;
}

public boolean isNestedNamespaceEnabled()
{
return nestedNamespaceEnabled;
}

@Config("iceberg.rest.nested.namespace.enabled")
@ConfigDescription("Allows querying nested namespaces. Default: true")
public IcebergRestConfig setNestedNamespaceEnabled(boolean nestedNamespaceEnabled)
{
this.nestedNamespaceEnabled = nestedNamespaceEnabled;
return this;
}

public boolean credentialOrTokenExists()
{
return credential != null || token != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,22 @@
*/
package com.facebook.presto.iceberg.util;

import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.google.common.base.Splitter;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;

import java.util.Optional;

import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;

public class IcebergPrestoModelConverters
{
private static final String NAMESPACE_SEPARATOR = ".";
private static final Splitter NAMESPACE_SPLITTER = Splitter.on(NAMESPACE_SEPARATOR).omitEmptyStrings().trimResults();

private IcebergPrestoModelConverters()
{
}
Expand All @@ -30,23 +38,38 @@ public static String toPrestoSchemaName(Namespace icebergNamespace)
return icebergNamespace.toString();
}

public static Namespace toIcebergNamespace(Optional<String> prestoSchemaName)
public static Namespace toIcebergNamespace(Optional<String> prestoSchemaName, boolean nestedNamespaceEnabled)
{
if (prestoSchemaName.isPresent()) {
checkNestedNamespaceSupport(prestoSchemaName.get(), nestedNamespaceEnabled);
return Namespace.of(NAMESPACE_SPLITTER.splitToList(prestoSchemaName.get()).toArray(new String[0]));
}
return Namespace.empty();
}

public static SchemaTableName toPrestoSchemaTableName(TableIdentifier icebergTableIdentifier, boolean nestedNamespaceEnabled)
{
return prestoSchemaName.map(Namespace::of).orElse(Namespace.empty());
String schemaName = icebergTableIdentifier.namespace().toString();
checkNestedNamespaceSupport(schemaName, nestedNamespaceEnabled);
return new SchemaTableName(schemaName, icebergTableIdentifier.name());
}

public static SchemaTableName toPrestoSchemaTableName(TableIdentifier icebergTableIdentifier)
public static TableIdentifier toIcebergTableIdentifier(SchemaTableName prestoSchemaTableName, boolean nestedNamespaceEnabled)
{
return new SchemaTableName(icebergTableIdentifier.namespace().toString(), icebergTableIdentifier.name());
return toIcebergTableIdentifier(toIcebergNamespace(Optional.ofNullable(prestoSchemaTableName.getSchemaName()), nestedNamespaceEnabled),
prestoSchemaTableName.getTableName(), nestedNamespaceEnabled);
}

public static TableIdentifier toIcebergTableIdentifier(SchemaTableName prestoSchemaTableName)
public static TableIdentifier toIcebergTableIdentifier(Namespace icebergNamespace, String prestoTableName, boolean nestedNamespaceEnabled)
{
return toIcebergTableIdentifier(prestoSchemaTableName.getSchemaName(), prestoSchemaTableName.getTableName());
checkNestedNamespaceSupport(icebergNamespace.toString(), nestedNamespaceEnabled);
return TableIdentifier.of(icebergNamespace, prestoTableName);
}

public static TableIdentifier toIcebergTableIdentifier(String prestoSchemaName, String prestoTableName)
private static void checkNestedNamespaceSupport(String schemaName, boolean nestedNamespaceEnabled)
{
return TableIdentifier.of(prestoSchemaName, prestoTableName);
if (!nestedNamespaceEnabled && schemaName.contains(NAMESPACE_SEPARATOR)) {
throw new PrestoException(NOT_SUPPORTED, format("Nested namespaces are disabled. Schema %s is not valid", schemaName));
}
}
}
Loading
Loading