Skip to content

Commit

Permalink
[INLONG-9240][Manager][Sort] Add append mode for the Iceberg connector (
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng authored Nov 15, 2023
1 parent 78c4a76 commit 52498f1
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public class IcebergSink extends StreamSink {
@ApiModelProperty("Primary key")
private String primaryKey;

@ApiModelProperty("append mode, UPSERT or APPEND")
private String appendMode;

public IcebergSink() {
this.setSinkType(SinkType.ICEBERG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import lombok.EqualsAndHashCode;
import lombok.ToString;

import javax.validation.constraints.Pattern;

/**
* Iceberg sink request.
*/
Expand Down Expand Up @@ -64,4 +66,8 @@ public class IcebergSinkRequest extends SinkRequest {
@ApiModelProperty("Primary key")
private String primaryKey;

@ApiModelProperty("append mode, UPSERT or APPEND")
@Pattern(regexp = "(?i)(UPSERT|APPEND)", message = "Invalid append mode")
private String appendMode;

}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> con
icebergSink.getPrimaryKey(),
catalogType,
icebergSink.getCatalogUri(),
icebergSink.getWarehouse());
icebergSink.getWarehouse(),
icebergSink.getAppendMode());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class IcebergConstant {
public static final String STREAMING = "streaming";
public static final String STARTING_STRATEGY_KEY = "starting-strategy";

public static final String APPEND_MODE_KEY = "appendMode";

/**
* Iceberg supported catalog type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata,
@JsonProperty("warehouse")
private String warehouse;

@JsonProperty("appendMode")
private String appendMode;

@JsonCreator
public IcebergLoadNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
Expand All @@ -87,14 +90,16 @@ public IcebergLoadNode(@JsonProperty("id") String id,
@JsonProperty("primaryKey") String primaryKey,
@JsonProperty("catalogType") IcebergConstant.CatalogType catalogType,
@JsonProperty("uri") String uri,
@JsonProperty("warehouse") String warehouse) {
@JsonProperty("warehouse") String warehouse,
@JsonProperty("appendMode") String appendMode) {
super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
this.tableName = Preconditions.checkNotNull(tableName, "table name is null");
this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
this.primaryKey = primaryKey;
this.catalogType = catalogType == null ? CatalogType.HIVE : catalogType;
this.uri = uri;
this.warehouse = warehouse;
this.appendMode = appendMode;
}

@Override
Expand All @@ -108,11 +113,12 @@ public Map<String, String> tableOptions() {
options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName);
options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name());
options.put(IcebergConstant.CATALOG_NAME_KEY, catalogType.name());
options.put(IcebergConstant.APPEND_MODE_KEY, appendMode);
if (null != uri) {
options.put("uri", uri);
options.put(IcebergConstant.URI_KEY, uri);
}
if (null != warehouse) {
options.put("warehouse", warehouse);
options.put(IcebergConstant.WAREHOUSE_KEY, warehouse);
}
return options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public IcebergLoadNode getTestObject() {
"id",
CatalogType.HIVE,
"thrift://localhost:9083",
"hdfs://localhost:9000/user/iceberg/warehouse");
"hdfs://localhost:9000/user/iceberg/warehouse",
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ private IcebergLoadNode buildIcebergLoadNodeWithHadoopCatalog() {
null,
CatalogType.HADOOP,
null,
"hdfs://localhost:9000/iceberg/warehouse");
"hdfs://localhost:9000/iceberg/warehouse",
null);
}

private IcebergLoadNode buildIcebergLoadNodeWithHiveCatalog() {
Expand Down Expand Up @@ -139,7 +140,8 @@ private IcebergLoadNode buildIcebergLoadNodeWithHiveCatalog() {
null,
CatalogType.HIVE,
"thrift://localhost:9083",
"/hive/warehouse");
"/hive/warehouse",
null);
}

/**
Expand Down

0 comments on commit 52498f1

Please sign in to comment.