diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java index e7bb7325d04..19b988b79d3 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java @@ -72,6 +72,9 @@ public class IcebergSink extends StreamSink { @ApiModelProperty("Primary key") private String primaryKey; + @ApiModelProperty("append mode, UPSERT or APPEND") + private String appendMode; + public IcebergSink() { this.setSinkType(SinkType.ICEBERG); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java index 129790f90cf..aa3c606b3f5 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java @@ -27,6 +27,8 @@ import lombok.EqualsAndHashCode; import lombok.ToString; +import javax.validation.constraints.Pattern; + /** * Iceberg sink request. */ @@ -64,4 +66,8 @@ public class IcebergSinkRequest extends SinkRequest { @ApiModelProperty("Primary key") private String primaryKey; + @ApiModelProperty("append mode, UPSERT or APPEND") + @Pattern(regexp = "(?i)(UPSERT|APPEND)", message = "Invalid append mode") + private String appendMode; + } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java index 06cd989198f..f922b289ffa 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java @@ -96,7 +96,8 @@ public LoadNode createLoadNode(StreamNode nodeInfo, Map con icebergSink.getPrimaryKey(), catalogType, icebergSink.getCatalogUri(), - icebergSink.getWarehouse()); + icebergSink.getWarehouse(), + icebergSink.getAppendMode()); } @Override diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java index 2cce35fb9ba..ce9b81b8a91 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java @@ -36,6 +36,8 @@ public class IcebergConstant { public static final String STREAMING = "streaming"; public static final String STARTING_STRATEGY_KEY = "starting-strategy"; + public static final String APPEND_MODE_KEY = "appendMode"; + /** * Iceberg supported catalog type */ diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java index 6ffaf5f2da7..7f76891dfdb 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java @@ -73,6 +73,9 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata, @JsonProperty("warehouse") private String warehouse; + @JsonProperty("appendMode") + private String appendMode; + @JsonCreator public IcebergLoadNode(@JsonProperty("id") String id, @JsonProperty("name") String name, @@ -87,7 +90,8 @@ public IcebergLoadNode(@JsonProperty("id") String id, @JsonProperty("primaryKey") String primaryKey, @JsonProperty("catalogType") IcebergConstant.CatalogType catalogType, @JsonProperty("uri") String uri, - @JsonProperty("warehouse") String warehouse) { + @JsonProperty("warehouse") String warehouse, + @JsonProperty("appendMode") String appendMode) { super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties); this.tableName = Preconditions.checkNotNull(tableName, "table name is null"); this.dbName = Preconditions.checkNotNull(dbName, "db name is null"); @@ -95,6 +99,7 @@ public IcebergLoadNode(@JsonProperty("id") String id, this.catalogType = catalogType == null ? CatalogType.HIVE : catalogType; this.uri = uri; this.warehouse = warehouse; + this.appendMode = appendMode; } @Override @@ -108,11 +113,12 @@ public Map tableOptions() { options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName); options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name()); options.put(IcebergConstant.CATALOG_NAME_KEY, catalogType.name()); + options.put(IcebergConstant.APPEND_MODE_KEY, appendMode); if (null != uri) { - options.put("uri", uri); + options.put(IcebergConstant.URI_KEY, uri); } if (null != warehouse) { - options.put("warehouse", warehouse); + options.put(IcebergConstant.WAREHOUSE_KEY, warehouse); } return options; } diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java index 69f20312355..24d5b39730a 100644 --- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java @@ -45,6 +45,7 @@ public IcebergLoadNode getTestObject() { "id", CatalogType.HIVE, "thrift://localhost:9083", - "hdfs://localhost:9000/user/iceberg/warehouse"); + "hdfs://localhost:9000/user/iceberg/warehouse", + null); } } diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java index 41d5701fdf2..fc0b889db0b 100644 --- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java @@ -100,7 +100,8 @@ private IcebergLoadNode buildIcebergLoadNodeWithHadoopCatalog() { null, CatalogType.HADOOP, null, - "hdfs://localhost:9000/iceberg/warehouse"); + "hdfs://localhost:9000/iceberg/warehouse", + null); } private IcebergLoadNode buildIcebergLoadNodeWithHiveCatalog() { @@ -139,7 +140,8 @@ private IcebergLoadNode buildIcebergLoadNodeWithHiveCatalog() { null, CatalogType.HIVE, "thrift://localhost:9083", - "/hive/warehouse"); + "/hive/warehouse", + null); } /**