Skip to content

Commit fe97297

Browse files
committed
[Iceberg]Enable setting separate data write location on table creation
1 parent 56bf5e6 commit fe97297

File tree

7 files changed

+177
-13
lines changed

7 files changed

+177
-13
lines changed

presto-docs/src/main/sphinx/connector/iceberg.rst

+6
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,12 @@ Property Name Description
370370
``location`` Optionally specifies the file system location URI for
371371
the table.
372372

373+
``write.data.path`` Optionally specifies the file system location URI for
374+
storing the data and delete files of the table. This only
375+
applies to files written after this property is set. Files
376+
previously written aren't relocated to reflect this
377+
parameter.
378+
373379
``format_version`` Optionally specifies the format version of the Iceberg ``2``
374380
specification to use for new tables, either ``1`` or ``2``.
375381

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

+18-6
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,19 @@
153153
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
154154
import static com.facebook.presto.iceberg.IcebergUtil.getDeleteMode;
155155
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
156+
import static com.facebook.presto.iceberg.IcebergUtil.getMetadataPreviousVersionsMax;
157+
import static com.facebook.presto.iceberg.IcebergUtil.getMetricsMaxInferredColumn;
156158
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionFields;
157159
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeyColumnHandles;
158160
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionSpecsIncludingValidData;
159161
import static com.facebook.presto.iceberg.IcebergUtil.getPartitions;
160162
import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdTimeOperator;
161163
import static com.facebook.presto.iceberg.IcebergUtil.getSortFields;
164+
import static com.facebook.presto.iceberg.IcebergUtil.getSplitSize;
162165
import static com.facebook.presto.iceberg.IcebergUtil.getTableComment;
166+
import static com.facebook.presto.iceberg.IcebergUtil.getUpdateMode;
163167
import static com.facebook.presto.iceberg.IcebergUtil.getViewComment;
168+
import static com.facebook.presto.iceberg.IcebergUtil.isMetadataDeleteAfterCommit;
164169
import static com.facebook.presto.iceberg.IcebergUtil.resolveSnapshotIdByName;
165170
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
166171
import static com.facebook.presto.iceberg.IcebergUtil.tryGetLocation;
@@ -188,6 +193,7 @@
188193
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
189194
import static com.facebook.presto.spi.StandardWarningCode.SORT_COLUMN_TRANSFORM_NOT_SUPPORTED_WARNING;
190195
import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT;
196+
import static com.google.common.base.Strings.isNullOrEmpty;
191197
import static com.google.common.base.Verify.verify;
192198
import static com.google.common.collect.ImmutableList.toImmutableList;
193199
import static com.google.common.collect.ImmutableMap.toImmutableMap;
@@ -204,6 +210,7 @@
204210
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
205211
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
206212
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
213+
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
207214

208215
public abstract class IcebergAbstractMetadata
209216
implements ConnectorMetadata
@@ -716,12 +723,17 @@ protected ImmutableMap<String, Object> createMetadataProperties(Table icebergTab
716723
properties.put(LOCATION_PROPERTY, icebergTable.location());
717724
}
718725

719-
properties.put(DELETE_MODE, IcebergUtil.getDeleteMode(icebergTable));
720-
properties.put(UPDATE_MODE, IcebergUtil.getUpdateMode(icebergTable));
721-
properties.put(METADATA_PREVIOUS_VERSIONS_MAX, IcebergUtil.getMetadataPreviousVersionsMax(icebergTable));
722-
properties.put(METADATA_DELETE_AFTER_COMMIT, IcebergUtil.isMetadataDeleteAfterCommit(icebergTable));
723-
properties.put(METRICS_MAX_INFERRED_COLUMN, IcebergUtil.getMetricsMaxInferredColumn(icebergTable));
724-
properties.put(SPLIT_SIZE, IcebergUtil.getSplitSize(icebergTable));
726+
String writeDataLocation = icebergTable.properties().get(WRITE_DATA_LOCATION);
727+
if (!isNullOrEmpty(writeDataLocation)) {
728+
properties.put(WRITE_DATA_LOCATION, writeDataLocation);
729+
}
730+
731+
properties.put(DELETE_MODE, getDeleteMode(icebergTable));
732+
properties.put(UPDATE_MODE, getUpdateMode(icebergTable));
733+
properties.put(METADATA_PREVIOUS_VERSIONS_MAX, getMetadataPreviousVersionsMax(icebergTable));
734+
properties.put(METADATA_DELETE_AFTER_COMMIT, isMetadataDeleteAfterCommit(icebergTable));
735+
properties.put(METRICS_MAX_INFERRED_COLUMN, getMetricsMaxInferredColumn(icebergTable));
736+
properties.put(SPLIT_SIZE, getSplitSize(icebergTable));
725737

726738
SortOrder sortOrder = icebergTable.sortOrder();
727739
// TODO: Support sort column transforms (https://github.com/prestodb/presto/issues/24250)

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -320,20 +320,21 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
320320
try {
321321
TableIdentifier tableIdentifier = toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled());
322322
String targetPath = getTableLocation(tableMetadata.getProperties());
323+
Map<String, String> tableProperties = populateTableProperties(tableMetadata, fileFormat, session);
323324
if (!isNullOrEmpty(targetPath)) {
324325
transaction = catalogFactory.getCatalog(session).newCreateTableTransaction(
325326
tableIdentifier,
326327
schema,
327328
partitionSpec,
328329
targetPath,
329-
populateTableProperties(tableMetadata, fileFormat, session));
330+
tableProperties);
330331
}
331332
else {
332333
transaction = catalogFactory.getCatalog(session).newCreateTableTransaction(
333334
tableIdentifier,
334335
schema,
335336
partitionSpec,
336-
populateTableProperties(tableMetadata, fileFormat, session));
337+
tableProperties);
337338
}
338339
}
339340
catch (AlreadyExistsException e) {

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java

+11
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import static com.google.common.collect.ImmutableList.toImmutableList;
3535
import static java.util.Locale.ENGLISH;
3636
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
37+
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
3738

3839
public class IcebergTableProperties
3940
{
@@ -92,6 +93,11 @@ public IcebergTableProperties(IcebergConfig icebergConfig)
9293
false,
9394
value -> (List<?>) value,
9495
value -> value))
96+
.add(stringProperty(
97+
WRITE_DATA_LOCATION,
98+
"File system location URI for the table's data and delete files",
99+
null,
100+
false))
95101
.add(stringProperty(
96102
FORMAT_VERSION,
97103
"Format version for the table",
@@ -182,6 +188,11 @@ public static String getTableLocation(Map<String, Object> tableProperties)
182188
return (String) tableProperties.get(LOCATION_PROPERTY);
183189
}
184190

191+
public static String getWriteDataLocation(Map<String, Object> tableProperties)
192+
{
193+
return (String) tableProperties.get(WRITE_DATA_LOCATION);
194+
}
195+
185196
public static String getFormatVersion(Map<String, Object> tableProperties)
186197
{
187198
return (String) tableProperties.get(FORMAT_VERSION);

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java

+16-5
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import org.apache.iceberg.StructLike;
7070
import org.apache.iceberg.Table;
7171
import org.apache.iceberg.TableOperations;
72-
import org.apache.iceberg.TableProperties;
7372
import org.apache.iceberg.TableScan;
7473
import org.apache.iceberg.catalog.Catalog;
7574
import org.apache.iceberg.catalog.ViewCatalog;
@@ -144,12 +143,14 @@
144143
import static com.facebook.presto.iceberg.IcebergSessionProperties.isMergeOnReadModeEnabled;
145144
import static com.facebook.presto.iceberg.IcebergTableProperties.getCommitRetries;
146145
import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion;
146+
import static com.facebook.presto.iceberg.IcebergTableProperties.getWriteDataLocation;
147147
import static com.facebook.presto.iceberg.TypeConverter.toIcebergType;
148148
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
149149
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier;
150150
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
151151
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
152152
import static com.google.common.base.Preconditions.checkArgument;
153+
import static com.google.common.base.Strings.isNullOrEmpty;
153154
import static com.google.common.base.Verify.verify;
154155
import static com.google.common.collect.ImmutableList.toImmutableList;
155156
import static com.google.common.collect.ImmutableMap.toImmutableMap;
@@ -196,13 +197,17 @@
196197
import static org.apache.iceberg.TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT;
197198
import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS;
198199
import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT;
200+
import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH;
199201
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
200202
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
201203
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
202204
import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
203205
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
204206
import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT;
207+
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
208+
import static org.apache.iceberg.TableProperties.WRITE_FOLDER_STORAGE_LOCATION;
205209
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
210+
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION;
206211
import static org.apache.iceberg.types.Type.TypeID.BINARY;
207212
import static org.apache.iceberg.types.Type.TypeID.FIXED;
208213

@@ -1143,6 +1148,12 @@ public void close()
11431148
public static Map<String, String> populateTableProperties(ConnectorTableMetadata tableMetadata, FileFormat fileFormat, ConnectorSession session)
11441149
{
11451150
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builderWithExpectedSize(5);
1151+
1152+
String writeDataLocation = getWriteDataLocation(tableMetadata.getProperties());
1153+
if (!isNullOrEmpty(writeDataLocation)) {
1154+
propertiesBuilder.put(WRITE_DATA_LOCATION, writeDataLocation);
1155+
}
1156+
11461157
Integer commitRetries = getCommitRetries(tableMetadata.getProperties());
11471158
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString());
11481159
propertiesBuilder.put(COMMIT_NUM_RETRIES, String.valueOf(commitRetries));
@@ -1265,7 +1276,7 @@ public static Optional<PartitionData> partitionDataFromStructLike(PartitionSpec
12651276
*/
12661277
public static String metadataLocation(Table icebergTable)
12671278
{
1268-
String metadataLocation = icebergTable.properties().get(TableProperties.WRITE_METADATA_LOCATION);
1279+
String metadataLocation = icebergTable.properties().get(WRITE_METADATA_LOCATION);
12691280

12701281
if (metadataLocation != null) {
12711282
return String.format("%s", LocationUtil.stripTrailingSlash(metadataLocation));
@@ -1282,11 +1293,11 @@ public static String metadataLocation(Table icebergTable)
12821293
public static String dataLocation(Table icebergTable)
12831294
{
12841295
Map<String, String> properties = icebergTable.properties();
1285-
String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION);
1296+
String dataLocation = properties.get(WRITE_DATA_LOCATION);
12861297
if (dataLocation == null) {
1287-
dataLocation = properties.get(TableProperties.OBJECT_STORE_PATH);
1298+
dataLocation = properties.get(OBJECT_STORE_PATH);
12881299
if (dataLocation == null) {
1289-
dataLocation = properties.get(TableProperties.WRITE_FOLDER_STORAGE_LOCATION);
1300+
dataLocation = properties.get(WRITE_FOLDER_STORAGE_LOCATION);
12901301
if (dataLocation == null) {
12911302
dataLocation = String.format("%s/data", icebergTable.location());
12921303
}

presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java

+87
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.testng.annotations.DataProvider;
3030
import org.testng.annotations.Test;
3131

32+
import java.io.IOException;
3233
import java.nio.file.Path;
3334
import java.util.function.BiConsumer;
3435
import java.util.regex.Matcher;
@@ -47,6 +48,7 @@
4748
import static com.google.common.base.Preconditions.checkArgument;
4849
import static com.google.common.collect.Iterables.getOnlyElement;
4950
import static java.lang.String.format;
51+
import static java.nio.file.Files.createTempDirectory;
5052
import static java.util.Locale.ENGLISH;
5153
import static java.util.Objects.requireNonNull;
5254
import static java.util.stream.Collectors.joining;
@@ -152,6 +154,91 @@ public void testShowCreateTable()
152154
")", schemaName, getLocation(schemaName, "orders")));
153155
}
154156

157+
@Test
158+
public void testTableWithSpecifiedWriteDataLocation()
159+
throws IOException
160+
{
161+
String tableName = "test_table_with_specified_write_data_location";
162+
String dataWriteLocation = createTempDirectory(tableName).toAbsolutePath().toString();
163+
try {
164+
assertUpdate(format("create table %s(a int, b varchar) with (\"write.data.path\" = '%s')", tableName, dataWriteLocation));
165+
assertUpdate(format("insert into %s values(1, '1001'), (2, '1002'), (3, '1003')", tableName), 3);
166+
assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002'), (3, '1003')");
167+
assertUpdate(format("delete from %s where a > 2", tableName), 1);
168+
assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002')");
169+
}
170+
finally {
171+
try {
172+
getQueryRunner().execute("drop table if exists " + tableName);
173+
}
174+
catch (Exception e) {
175+
// ignored for hive catalog compatibility
176+
}
177+
}
178+
}
179+
180+
@Test
181+
public void testPartitionedTableWithSpecifiedWriteDataLocation()
182+
throws IOException
183+
{
184+
String tableName = "test_partitioned_table_with_specified_write_data_location";
185+
String dataWriteLocation = createTempDirectory(tableName).toAbsolutePath().toString();
186+
try {
187+
assertUpdate(format("create table %s(a int, b varchar) with (partitioning = ARRAY['a'], \"write.data.path\" = '%s')", tableName, dataWriteLocation));
188+
assertUpdate(format("insert into %s values(1, '1001'), (2, '1002'), (3, '1003')", tableName), 3);
189+
assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002'), (3, '1003')");
190+
assertUpdate(format("delete from %s where a > 2", tableName), 1);
191+
assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002')");
192+
}
193+
finally {
194+
try {
195+
getQueryRunner().execute("drop table if exists " + tableName);
196+
}
197+
catch (Exception e) {
198+
// ignored for hive catalog compatibility
199+
}
200+
}
201+
}
202+
203+
@Test
204+
public void testShowCreateTableWithSpecifiedWriteDataLocation()
205+
throws IOException
206+
{
207+
String tableName = "test_show_table_with_specified_write_data_location";
208+
String dataWriteLocation = createTempDirectory("test1").toAbsolutePath().toString();
209+
try {
210+
assertUpdate(format("CREATE TABLE %s(a int, b varchar) with (\"write.data.path\" = '%s')", tableName, dataWriteLocation));
211+
String schemaName = getSession().getSchema().get();
212+
String location = getLocation(schemaName, tableName);
213+
String createTableSql = "CREATE TABLE iceberg.%s.%s (\n" +
214+
" \"a\" integer,\n" +
215+
" \"b\" varchar\n" +
216+
")\n" +
217+
"WITH (\n" +
218+
" delete_mode = 'merge-on-read',\n" +
219+
" format = 'PARQUET',\n" +
220+
" format_version = '2',\n" +
221+
" location = '%s',\n" +
222+
" metadata_delete_after_commit = false,\n" +
223+
" metadata_previous_versions_max = 100,\n" +
224+
" metrics_max_inferred_column = 100,\n" +
225+
" \"read.split.target-size\" = 134217728,\n" +
226+
" \"write.data.path\" = '%s',\n" +
227+
" \"write.update.mode\" = 'merge-on-read'\n" +
228+
")";
229+
assertThat(computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue())
230+
.isEqualTo(format(createTableSql, schemaName, tableName, location, dataWriteLocation));
231+
}
232+
finally {
233+
try {
234+
getQueryRunner().execute("DROP TABLE IF EXISTS " + tableName);
235+
}
236+
catch (Exception e) {
237+
// ignored for hive catalog compatibility
238+
}
239+
}
240+
}
241+
155242
@Test
156243
public void testDecimal()
157244
{

presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRestNestedNamespace.java

+36
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.testng.annotations.Test;
3939

4040
import java.io.File;
41+
import java.io.IOException;
4142
import java.util.Map;
4243
import java.util.Optional;
4344

@@ -50,6 +51,7 @@
5051
import static com.google.common.io.MoreFiles.deleteRecursively;
5152
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
5253
import static java.lang.String.format;
54+
import static java.nio.file.Files.createTempDirectory;
5355
import static java.util.Locale.ENGLISH;
5456
import static org.assertj.core.api.Assertions.assertThat;
5557
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -182,6 +184,40 @@ public void testShowCreateTable()
182184
")", schemaName, getLocation(schemaName, "orders")));
183185
}
184186

187+
@Test
188+
public void testShowCreateTableWithSpecifiedWriteDataLocation()
189+
throws IOException
190+
{
191+
String tableName = "test_table_with_specified_write_data_location";
192+
String dataWriteLocation = createTempDirectory("test1").toAbsolutePath().toString();
193+
try {
194+
assertUpdate(format("CREATE TABLE %s(a int, b varchar) with (\"write.data.path\" = '%s')", tableName, dataWriteLocation));
195+
String schemaName = getSession().getSchema().get();
196+
String location = getLocation(schemaName, tableName);
197+
String createTableSql = "CREATE TABLE iceberg.\"%s\".%s (\n" +
198+
" \"a\" integer,\n" +
199+
" \"b\" varchar\n" +
200+
")\n" +
201+
"WITH (\n" +
202+
" delete_mode = 'merge-on-read',\n" +
203+
" format = 'PARQUET',\n" +
204+
" format_version = '2',\n" +
205+
" location = '%s',\n" +
206+
" metadata_delete_after_commit = false,\n" +
207+
" metadata_previous_versions_max = 100,\n" +
208+
" metrics_max_inferred_column = 100,\n" +
209+
" \"read.split.target-size\" = 134217728,\n" +
210+
" \"write.data.path\" = '%s',\n" +
211+
" \"write.update.mode\" = 'merge-on-read'\n" +
212+
")";
213+
assertThat(computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue())
214+
.isEqualTo(format(createTableSql, schemaName, tableName, location, dataWriteLocation));
215+
}
216+
finally {
217+
assertUpdate(("DROP TABLE IF EXISTS " + tableName));
218+
}
219+
}
220+
185221
@Test
186222
@Override // override due to double quotes around nested namespace
187223
public void testTableComments()

0 commit comments

Comments
 (0)