Skip to content

Commit 2005b0b

Browse files
ZacBlancoevanvdiaavsned
committed
Add support for UPDATE in iceberg
This commit allows users to perform row-level updates when using the Iceberg connector with Java-based workers. This is achieved by improving on the IcebergUpdatablePageSource to implement the updateRows method. The implementation passes a generated row ID column as a field in the page required by updateRows. Then during updateRows, generated a positionDelete file entry for the row ID, and also writes the row's updated value to a new page sink for the newly updated data. These new files are then commited in a rowDelta transaction within the Iceberg connector metadata after processing is complete. Co-Authored-By: Nidhin Varghese <Nidhin.Varghese1@ibm.com> Co-Authored-By: Anoop V S <anoop.v.s@ibm.com>
1 parent 4e3cf23 commit 2005b0b

40 files changed

+1183
-227
lines changed

presto-docs/src/main/sphinx/connector/iceberg.rst

+24
Original file line numberDiff line numberDiff line change
@@ -1414,6 +1414,30 @@ For example, ``DESCRIBE`` from the partitioned Iceberg table ``customer``:
14141414
comment | varchar | |
14151415
(3 rows)
14161416

1417+
UPDATE
1418+
^^^^^^
1419+
1420+
The Iceberg connector supports :doc:`../sql/update` operations on Iceberg
1421+
tables. Only some tables support updates. These tables must be at minimum format
1422+
version 2, and the ``write.update.mode`` must be set to `merge-on-read`.
1423+
1424+
.. code-block:: sql
1425+
1426+
UPDATE region SET name = 'EU', comment = 'Europe' WHERE regionkey = 1;
1427+
1428+
.. code-block:: text
1429+
1430+
UPDATE: 1 row
1431+
1432+
Query 20250204_010341_00021_ymwi5, FINISHED, 2 nodes
1433+
1434+
The query returns an error if the table does not meet the requirements for
1435+
updates.
1436+
1437+
.. code-block:: text
1438+
1439+
Query 20250204_010445_00022_ymwi5 failed: Iceberg table updates require at least format version 2 and update mode must be merge-on-read
1440+
14171441
Schema Evolution
14181442
----------------
14191443

presto-iceberg/src/main/java/com/facebook/presto/iceberg/ColumnIdentity.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public int hashCode()
103103
@Override
104104
public String toString()
105105
{
106-
return id + ":" + name;
106+
return id + ":" + name + ":" + typeCategory + ":" + children;
107107
}
108108

109109
public enum TypeCategory

presto-iceberg/src/main/java/com/facebook/presto/iceberg/CommitTaskData.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public class CommitTaskData
2929
private final Optional<String> partitionDataJson;
3030
private final FileFormat fileFormat;
3131
private final Optional<String> referencedDataFile;
32+
private final FileContent content;
3233

3334
@JsonCreator
3435
public CommitTaskData(
@@ -38,7 +39,8 @@ public CommitTaskData(
3839
@JsonProperty("partitionSpecJson") int partitionSpecId,
3940
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson,
4041
@JsonProperty("fileFormat") FileFormat fileFormat,
41-
@JsonProperty("referencedDataFile") String referencedDataFile)
42+
@JsonProperty("referencedDataFile") String referencedDataFile,
43+
@JsonProperty("content") FileContent content)
4244
{
4345
this.path = requireNonNull(path, "path is null");
4446
this.fileSizeInBytes = fileSizeInBytes;
@@ -47,6 +49,7 @@ public CommitTaskData(
4749
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
4850
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
4951
this.referencedDataFile = Optional.ofNullable(referencedDataFile);
52+
this.content = requireNonNull(content, "content is null");
5053
}
5154

5255
@JsonProperty
@@ -90,4 +93,10 @@ public Optional<String> getReferencedDataFile()
9093
{
9194
return referencedDataFile;
9295
}
96+
97+
@JsonProperty
98+
public FileContent getContent()
99+
{
100+
return content;
101+
}
93102
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg;
15+
16+
import com.facebook.presto.spi.ConnectorPageSource;
17+
18+
import java.util.Optional;
19+
20+
import static java.util.Objects.requireNonNull;
21+
22+
public class ConnectorPageSourceWithRowPositions
23+
{
24+
private final ConnectorPageSource delegate;
25+
private final Optional<Long> startRowPosition;
26+
private final Optional<Long> endRowPosition;
27+
28+
public ConnectorPageSourceWithRowPositions(
29+
ConnectorPageSource delegate,
30+
Optional<Long> startRowPosition,
31+
Optional<Long> endRowPosition)
32+
{
33+
this.delegate = requireNonNull(delegate, "delegate is null");
34+
this.startRowPosition = requireNonNull(startRowPosition, "startRowPosition is null");
35+
this.endRowPosition = requireNonNull(endRowPosition, "endRowPosition is null");
36+
}
37+
38+
public ConnectorPageSource getDelegate()
39+
{
40+
return delegate;
41+
}
42+
43+
public Optional<Long> getStartRowPosition()
44+
{
45+
return startRowPosition;
46+
}
47+
48+
public Optional<Long> getEndRowPosition()
49+
{
50+
return endRowPosition;
51+
}
52+
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/FileFormat.java

+19
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,23 @@ public static FileFormat fromIcebergFileFormat(org.apache.iceberg.FileFormat for
6767

6868
return prestoFileFormat;
6969
}
70+
71+
public org.apache.iceberg.FileFormat toIceberg()
72+
{
73+
org.apache.iceberg.FileFormat fileFormat;
74+
switch (this) {
75+
case ORC:
76+
fileFormat = org.apache.iceberg.FileFormat.ORC;
77+
break;
78+
case PARQUET:
79+
fileFormat = org.apache.iceberg.FileFormat.PARQUET;
80+
break;
81+
case AVRO:
82+
fileFormat = org.apache.iceberg.FileFormat.AVRO;
83+
break;
84+
default:
85+
throw new PrestoException(NOT_SUPPORTED, "Unsupported file format: " + this);
86+
}
87+
return fileFormat;
88+
}
7089
}

0 commit comments

Comments
 (0)