From 332bd434d19df614bffc496ba6d64fe7a8f4671c Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Fri, 1 Mar 2024 11:08:03 +0800 Subject: [PATCH] [INLONG-9756][Manager] add separator between groupId and streamId in jobName --- .../inlong/manager/plugin/listener/RestartSortListener.java | 3 ++- .../inlong/manager/plugin/listener/StartupSortListener.java | 3 ++- .../inlong/manager/plugin/listener/StartupStreamListener.java | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java index 66bf88b149..9a95da354a 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java @@ -122,7 +122,8 @@ public ListenerResult listen(WorkflowContext context) throws Exception { } FlinkInfo flinkInfo = new FlinkInfo(); - String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + streamInfo.getInlongStreamId(); + String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + InlongConstants.HYPHEN + + streamInfo.getInlongStreamId(); flinkInfo.setJobName(jobName); String sortUrl = kvConf.get(InlongConstants.SORT_URL); flinkInfo.setEndpoint(sortUrl); diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java index d2894ad88f..8fa72f1c4b 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java @@ -133,7 +133,8 @@ public ListenerResult listen(WorkflowContext context) throws Exception { FlinkInfo flinkInfo = new FlinkInfo(); - String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + streamInfo.getInlongStreamId(); + String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + InlongConstants.HYPHEN + + streamInfo.getInlongStreamId(); flinkInfo.setJobName(jobName); String sortUrl = kvConf.get(InlongConstants.SORT_URL); flinkInfo.setEndpoint(sortUrl); diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java index b3a341c37d..99c0245168 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java @@ -126,7 +126,8 @@ public ListenerResult listen(WorkflowContext context) throws Exception { FlinkInfo flinkInfo = new FlinkInfo(); - String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + streamInfo.getInlongStreamId(); + String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + InlongConstants.HYPHEN + + streamInfo.getInlongStreamId(); flinkInfo.setJobName(jobName); String sortUrl = kvConf.get(InlongConstants.SORT_URL); flinkInfo.setEndpoint(sortUrl);