Skip to content

Commit 75dce51

Browse files
committed
[Iceberg]Support setting warehouse data directory for Hadoop catalog
1 parent fe97297 commit 75dce51

27 files changed

+1155
-124
lines changed

presto-docs/src/main/sphinx/connector/iceberg.rst

+37
Original file line numberDiff line numberDiff line change
@@ -256,15 +256,52 @@ Property Name Description
256256
======================================================= ============================================================= ============
257257
``iceberg.catalog.warehouse`` The catalog warehouse root path for Iceberg tables.
258258

259+
The Hadoop catalog requires a file system that supports
260+
an atomic rename operation, such as HDFS, to maintain
261+
metadata files in order to implement an atomic transaction
262+
commit.
263+
259264
Example: ``hdfs://nn:8020/warehouse/path``
265+
266+
Do not set ``iceberg.catalog.warehouse`` to a path in object
267+
stores or local file systems in the production environment.
268+
260269
This property is required if the ``iceberg.catalog.type`` is
261270
``hadoop``. Otherwise, it will be ignored.
262271

272+
``iceberg.catalog.hadoop.warehouse.datadir`` The catalog warehouse root data path for Iceberg tables.
273+
It is only supported with the Hadoop catalog.
274+
275+
Example: ``s3://iceberg_bucket/warehouse``.
276+
277+
This optional property can be set to a path in object
278+
stores or HDFS.
279+
If set, all tables in this Hadoop catalog default to saving
280+
their data and delete files in the specified root
281+
data directory.
282+
263283
``iceberg.catalog.cached-catalog-num`` The number of Iceberg catalogs to cache. This property is ``10``
264284
required if the ``iceberg.catalog.type`` is ``hadoop``.
265285
Otherwise, it will be ignored.
266286
======================================================= ============================================================= ============
267287

288+
Configure the `Amazon S3 <https://prestodb.io/docs/current/connector/hive.html#amazon-s3-configuration>`_
289+
properties to specify a S3 location as the warehouse data directory for the Hadoop catalog. This way,
290+
the data and delete files of Iceberg tables are stored in S3. An example configuration includes:
291+
292+
.. code-block:: none
293+
294+
connector.name=iceberg
295+
iceberg.catalog.type=hadoop
296+
iceberg.catalog.warehouse=hdfs://nn:8020/warehouse/path
297+
iceberg.catalog.hadoop.warehouse.datadir=s3://iceberg_bucket/warehouse
298+
299+
hive.s3.use-instance-credentials=false
300+
hive.s3.aws-access-key=accesskey
301+
hive.s3.aws-secret-key=secretkey
302+
hive.s3.endpoint=http://192.168.0.103:9878
303+
hive.s3.path-style-access=true
304+
268305
Configuration Properties
269306
------------------------
270307

presto-iceberg/pom.xml

+22
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,16 @@
263263
</exclusions>
264264
</dependency>
265265

266+
<dependency>
267+
<groupId>com.amazonaws</groupId>
268+
<artifactId>aws-java-sdk-core</artifactId>
269+
</dependency>
270+
271+
<dependency>
272+
<groupId>com.amazonaws</groupId>
273+
<artifactId>aws-java-sdk-s3</artifactId>
274+
</dependency>
275+
266276
<dependency>
267277
<groupId>org.apache.iceberg</groupId>
268278
<artifactId>iceberg-core</artifactId>
@@ -598,6 +608,18 @@
598608
<scope>test</scope>
599609
</dependency>
600610

611+
<dependency>
612+
<groupId>org.testcontainers</groupId>
613+
<artifactId>testcontainers</artifactId>
614+
<scope>test</scope>
615+
<exclusions>
616+
<exclusion>
617+
<groupId>org.slf4j</groupId>
618+
<artifactId>slf4j-api</artifactId>
619+
</exclusion>
620+
</exclusions>
621+
</dependency>
622+
601623
<dependency>
602624
<groupId>org.apache.iceberg</groupId>
603625
<artifactId>iceberg-core</artifactId>

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

+72
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@
123123
import static com.facebook.presto.hive.MetadataUtils.getDiscretePredicates;
124124
import static com.facebook.presto.hive.MetadataUtils.getPredicate;
125125
import static com.facebook.presto.hive.MetadataUtils.getSubfieldPredicate;
126+
import static com.facebook.presto.hive.metastore.MetastoreUtil.TABLE_COMMENT;
126127
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
127128
import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_HANDLE;
128129
import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_METADATA;
@@ -146,6 +147,9 @@
146147
import static com.facebook.presto.iceberg.IcebergTableProperties.METRICS_MAX_INFERRED_COLUMN;
147148
import static com.facebook.presto.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
148149
import static com.facebook.presto.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
150+
import static com.facebook.presto.iceberg.IcebergTableProperties.getCommitRetries;
151+
import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion;
152+
import static com.facebook.presto.iceberg.IcebergTableProperties.getWriteDataLocation;
149153
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
150154
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
151155
import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES;
@@ -166,6 +170,7 @@
166170
import static com.facebook.presto.iceberg.IcebergUtil.getUpdateMode;
167171
import static com.facebook.presto.iceberg.IcebergUtil.getViewComment;
168172
import static com.facebook.presto.iceberg.IcebergUtil.isMetadataDeleteAfterCommit;
173+
import static com.facebook.presto.iceberg.IcebergUtil.parseFormatVersion;
169174
import static com.facebook.presto.iceberg.IcebergUtil.resolveSnapshotIdByName;
170175
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
171176
import static com.facebook.presto.iceberg.IcebergUtil.tryGetLocation;
@@ -206,8 +211,14 @@
206211
import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP;
207212
import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP;
208213
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;
214+
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
215+
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
209216
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
210217
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
218+
import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED;
219+
import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS;
220+
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
221+
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
211222
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
212223
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
213224
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
@@ -1152,6 +1163,62 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
11521163
transaction.commitTransaction();
11531164
}
11541165

1166+
protected Map<String, String> populateTableProperties(ConnectorTableMetadata tableMetadata, com.facebook.presto.iceberg.FileFormat fileFormat, ConnectorSession session)
1167+
{
1168+
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builderWithExpectedSize(16);
1169+
1170+
String writeDataLocation = getWriteDataLocation(tableMetadata.getProperties());
1171+
if (!isNullOrEmpty(writeDataLocation)) {
1172+
propertiesBuilder.put(WRITE_DATA_LOCATION, writeDataLocation);
1173+
}
1174+
else {
1175+
Optional<String> dataLocation = getDataLocationBasedOnWarehouseDataDir(tableMetadata.getTable());
1176+
dataLocation.ifPresent(location -> propertiesBuilder.put(WRITE_DATA_LOCATION, location));
1177+
}
1178+
1179+
Integer commitRetries = getCommitRetries(tableMetadata.getProperties());
1180+
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString());
1181+
propertiesBuilder.put(COMMIT_NUM_RETRIES, String.valueOf(commitRetries));
1182+
switch (fileFormat) {
1183+
case PARQUET:
1184+
propertiesBuilder.put(PARQUET_COMPRESSION, getCompressionCodec(session).getParquetCompressionCodec().get().toString());
1185+
break;
1186+
case ORC:
1187+
propertiesBuilder.put(ORC_COMPRESSION, getCompressionCodec(session).getOrcCompressionKind().name());
1188+
break;
1189+
}
1190+
if (tableMetadata.getComment().isPresent()) {
1191+
propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get());
1192+
}
1193+
1194+
String formatVersion = getFormatVersion(tableMetadata.getProperties());
1195+
verify(formatVersion != null, "Format version cannot be null");
1196+
propertiesBuilder.put(TableProperties.FORMAT_VERSION, formatVersion);
1197+
1198+
if (parseFormatVersion(formatVersion) < MIN_FORMAT_VERSION_FOR_DELETE) {
1199+
propertiesBuilder.put(TableProperties.DELETE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName());
1200+
propertiesBuilder.put(TableProperties.UPDATE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName());
1201+
}
1202+
else {
1203+
RowLevelOperationMode deleteMode = IcebergTableProperties.getDeleteMode(tableMetadata.getProperties());
1204+
propertiesBuilder.put(TableProperties.DELETE_MODE, deleteMode.modeName());
1205+
RowLevelOperationMode updateMode = IcebergTableProperties.getUpdateMode(tableMetadata.getProperties());
1206+
propertiesBuilder.put(TableProperties.UPDATE_MODE, updateMode.modeName());
1207+
}
1208+
1209+
Integer metadataPreviousVersionsMax = IcebergTableProperties.getMetadataPreviousVersionsMax(tableMetadata.getProperties());
1210+
propertiesBuilder.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, String.valueOf(metadataPreviousVersionsMax));
1211+
1212+
Boolean metadataDeleteAfterCommit = IcebergTableProperties.isMetadataDeleteAfterCommit(tableMetadata.getProperties());
1213+
propertiesBuilder.put(METADATA_DELETE_AFTER_COMMIT_ENABLED, String.valueOf(metadataDeleteAfterCommit));
1214+
1215+
Integer metricsMaxInferredColumn = IcebergTableProperties.getMetricsMaxInferredColumn(tableMetadata.getProperties());
1216+
propertiesBuilder.put(METRICS_MAX_INFERRED_COLUMN_DEFAULTS, String.valueOf(metricsMaxInferredColumn));
1217+
1218+
propertiesBuilder.put(SPLIT_SIZE, String.valueOf(IcebergTableProperties.getTargetSplitSize(tableMetadata.getProperties())));
1219+
return propertiesBuilder.build();
1220+
}
1221+
11551222
/**
11561223
* Deletes all the files for a specific predicate
11571224
*
@@ -1277,4 +1344,9 @@ public void finishUpdate(ConnectorSession session, ConnectorTableHandle tableHan
12771344
handle.getSortOrder());
12781345
finishWrite(session, outputTableHandle, fragments, UPDATE_AFTER);
12791346
}
1347+
1348+
protected Optional<String> getDataLocationBasedOnWarehouseDataDir(SchemaTableName schemaTableName)
1349+
{
1350+
return Optional.empty();
1351+
}
12801352
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java

+5
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@
9999
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
100100
import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
101101
import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder;
102+
import static com.facebook.presto.common.Utils.checkArgument;
103+
import static com.facebook.presto.iceberg.CatalogType.HADOOP;
102104
import static com.facebook.presto.orc.StripeMetadataSource.CacheableRowGroupIndices;
103105
import static com.facebook.presto.orc.StripeMetadataSource.CacheableSlice;
104106
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
@@ -142,6 +144,9 @@ protected void setup(Binder binder)
142144

143145
configBinder(binder).bindConfig(IcebergConfig.class);
144146

147+
IcebergConfig icebergConfig = buildConfigObject(IcebergConfig.class);
148+
checkArgument(icebergConfig.getCatalogType().equals(HADOOP) || icebergConfig.getCatalogWarehouseDataDir() == null, "'iceberg.catalog.hadoop.warehouse.datadir' can only be specified in Hadoop catalog");
149+
145150
binder.bind(IcebergSessionProperties.class).in(Scopes.SINGLETON);
146151
newOptionalBinder(binder, IcebergNessieConfig.class); // bind optional Nessie config to IcebergSessionProperties
147152

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java

+14
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class IcebergConfig
5050
private HiveCompressionCodec compressionCodec = GZIP;
5151
private CatalogType catalogType = HIVE;
5252
private String catalogWarehouse;
53+
private String catalogWarehouseDataDir;
5354
private int catalogCacheSize = 10;
5455
private int maxPartitionsPerWriter = 100;
5556
private List<String> hadoopConfigResources = ImmutableList.of();
@@ -127,6 +128,19 @@ public IcebergConfig setCatalogWarehouse(String catalogWarehouse)
127128
return this;
128129
}
129130

131+
public String getCatalogWarehouseDataDir()
132+
{
133+
return catalogWarehouseDataDir;
134+
}
135+
136+
@Config("iceberg.catalog.hadoop.warehouse.datadir")
137+
@ConfigDescription("Iceberg catalog default root data writing directory. This is only supported with Hadoop catalog.")
138+
public IcebergConfig setCatalogWarehouseDataDir(String catalogWarehouseDataDir)
139+
{
140+
this.catalogWarehouseDataDir = catalogWarehouseDataDir;
141+
return this;
142+
}
143+
130144
@Min(1)
131145
public int getCatalogCacheSize()
132146
{

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java

-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@
123123
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
124124
import static com.facebook.presto.iceberg.IcebergUtil.getHiveIcebergTable;
125125
import static com.facebook.presto.iceberg.IcebergUtil.isIcebergTable;
126-
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
127126
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
128127
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
129128
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java

+7
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class IcebergNativeCatalogFactory
5252
private final String catalogName;
5353
protected final CatalogType catalogType;
5454
private final String catalogWarehouse;
55+
private final String catalogWarehouseDataDir;
5556
protected final IcebergConfig icebergConfig;
5657

5758
private final List<String> hadoopConfigResources;
@@ -69,6 +70,7 @@ public IcebergNativeCatalogFactory(
6970
this.icebergConfig = requireNonNull(config, "config is null");
7071
this.catalogType = config.getCatalogType();
7172
this.catalogWarehouse = config.getCatalogWarehouse();
73+
this.catalogWarehouseDataDir = config.getCatalogWarehouseDataDir();
7274
this.hadoopConfigResources = icebergConfig.getHadoopConfigResources();
7375
this.s3ConfigurationUpdater = requireNonNull(s3ConfigurationUpdater, "s3ConfigurationUpdater is null");
7476
this.gcsConfigurationInitialize = requireNonNull(gcsConfigurationInitialize, "gcsConfigurationInitialize is null");
@@ -90,6 +92,11 @@ public Catalog getCatalog(ConnectorSession session)
9092
}
9193
}
9294

95+
public String getCatalogWarehouseDataDir()
96+
{
97+
return this.catalogWarehouseDataDir;
98+
}
99+
93100
public SupportsNamespaces getNamespaces(ConnectorSession session)
94101
{
95102
Catalog catalog = getCatalog(session);

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.concurrent.ConcurrentMap;
5959
import java.util.stream.Stream;
6060

61+
import static com.facebook.presto.iceberg.CatalogType.HADOOP;
6162
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR;
6263
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
6364
import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat;
@@ -70,7 +71,6 @@
7071
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
7172
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable;
7273
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergView;
73-
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
7474
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
7575
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
7676
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
@@ -95,6 +95,7 @@ public class IcebergNativeMetadata
9595
{
9696
private static final String VIEW_DIALECT = "presto";
9797

98+
private final Optional<String> warehouseDataDir;
9899
private final IcebergNativeCatalogFactory catalogFactory;
99100
private final CatalogType catalogType;
100101
private final ConcurrentMap<SchemaTableName, View> icebergViews = new ConcurrentHashMap<>();
@@ -113,6 +114,7 @@ public IcebergNativeMetadata(
113114
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache);
114115
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
115116
this.catalogType = requireNonNull(catalogType, "catalogType is null");
117+
this.warehouseDataDir = Optional.ofNullable(catalogFactory.getCatalogWarehouseDataDir());
116118
}
117119

118120
@Override
@@ -404,4 +406,12 @@ public void unregisterTable(ConnectorSession clientSession, SchemaTableName sche
404406
{
405407
catalogFactory.getCatalog(clientSession).dropTable(toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled()), false);
406408
}
409+
410+
protected Optional<String> getDataLocationBasedOnWarehouseDataDir(SchemaTableName schemaTableName)
411+
{
412+
if (!catalogType.equals(HADOOP)) {
413+
return Optional.empty();
414+
}
415+
return warehouseDataDir.map(base -> base + schemaTableName.getSchemaName() + "/" + schemaTableName.getTableName());
416+
}
407417
}

0 commit comments

Comments
 (0)