From 768f0a6da506b17c3173e266380574e04ea80617 Mon Sep 17 00:00:00 2001 From: Carl-Zhou-CN <67902676+Carl-Zhou-CN@users.noreply.github.com> Date: Tue, 30 May 2023 19:02:38 +0800 Subject: [PATCH] [Improve] Add a jobId to the doris label to distinguish between tasks (#4839) Co-authored-by: zhouyao --- .../seatunnel/connectors/doris/sink/DorisSink.java | 11 +++++++++-- .../connectors/doris/sink/writer/DorisSinkWriter.java | 6 ++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java index 018eb44bd6f..2c6d6ae7425 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.serialization.Serializer; @@ -54,6 +55,7 @@ public class DorisSink private Config pluginConfig; private SeaTunnelRowType seaTunnelRowType; + private String jobId; @Override public String getPluginName() { @@ -78,6 +80,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { } } + @Override + public void setJobContext(JobContext jobContext) { + this.jobId = jobContext.getJobId(); + } + @Override public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { this.seaTunnelRowType = seaTunnelRowType; @@ -93,7 +100,7 @@ public SinkWriter createWriter( SinkWriter.Context context) throws IOException { DorisSinkWriter dorisSinkWriter = new DorisSinkWriter( - context, Collections.emptyList(), seaTunnelRowType, pluginConfig); + context, Collections.emptyList(), seaTunnelRowType, pluginConfig, jobId); dorisSinkWriter.initializeLoad(Collections.emptyList()); return dorisSinkWriter; } @@ -102,7 +109,7 @@ public SinkWriter createWriter( public SinkWriter restoreWriter( SinkWriter.Context context, List states) throws IOException { DorisSinkWriter dorisWriter = - new DorisSinkWriter(context, states, seaTunnelRowType, pluginConfig); + new DorisSinkWriter(context, states, seaTunnelRowType, pluginConfig, jobId); dorisWriter.initializeLoad(states); return dorisWriter; } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index 744db83e6c2..ac0927f0863 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -78,13 +78,15 @@ public DorisSinkWriter( SinkWriter.Context context, List state, SeaTunnelRowType seaTunnelRowType, - Config pluginConfig) { + Config pluginConfig, + String jobId) { this.dorisConfig = DorisConfig.loadConfig(pluginConfig); this.lastCheckpointId = state.size() != 0 ? state.get(0).getCheckpointId() : 0; log.info("restore checkpointId {}", lastCheckpointId); log.info("labelPrefix " + dorisConfig.getLabelPrefix()); this.dorisSinkState = new DorisSinkState(dorisConfig.getLabelPrefix(), lastCheckpointId); - this.labelPrefix = dorisConfig.getLabelPrefix() + "_" + context.getIndexOfSubtask(); + this.labelPrefix = + dorisConfig.getLabelPrefix() + "_" + jobId + "_" + context.getIndexOfSubtask(); this.labelGenerator = new LabelGenerator(labelPrefix, dorisConfig.getEnable2PC()); this.scheduledExecutorService = new ScheduledThreadPoolExecutor(