diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java index ab9e9b55d89..fae5faa2785 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java @@ -73,7 +73,8 @@ public boolean accept(WorkflowContext workflowContext) { } log.info("add startup group listener for groupId [{}]", groupId); - return InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()); + return (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode())); } @Override @@ -141,9 +142,13 @@ public ListenerResult listen(WorkflowContext context) throws Exception { FlinkOperation flinkOperation = FlinkOperation.getInstance(); try { flinkOperation.genPath(flinkInfo, dataflow); - flinkOperation.start(flinkInfo); - log.info("job submit success for groupId = {}, streamId = {}, jobId = {}", groupId, - streamInfo.getInlongStreamId(), flinkInfo.getJobId()); + // only start job for real-time mode + if (InlongConstants.DATASYNC_REALTIME_MODE + .equals(groupResourceForm.getGroupInfo().getInlongGroupMode())) { + flinkOperation.start(flinkInfo); + log.info("job submit success for groupId = {}, streamId = {}, jobId = {}", groupId, + streamInfo.getInlongStreamId(), flinkInfo.getJobId()); + } } catch (Exception e) { flinkInfo.setException(true); flinkInfo.setExceptionMsg(getExceptionStackMsg(e)); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java index 1df2d014bac..0638ecc83a3 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java @@ -276,7 +276,8 @@ public List listByCondition(AuditRequest request) throws Exception { if (CollectionUtils.isEmpty(request.getAuditIds())) { // properly overwrite audit ids by role and stream config - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupEntity.getInlongGroupMode())) { auditIdMap.put(getAuditId(sourceNodeType, false), sourceNodeType); request.setAuditIds(getAuditIds(groupId, streamId, sourceNodeType, sinkNodeType)); } else { @@ -436,7 +437,8 @@ private List getAuditIds(String groupId, String streamId, String sourceN } else { auditSet.add(getAuditId(sinkNodeType, true)); InlongGroupEntity inlongGroup = inlongGroupMapper.selectByGroupId(groupId); - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(inlongGroup.getInlongGroupMode())) { auditSet.add(getAuditId(sourceNodeType, false)); } else { auditSet.add(getAuditId(sinkNodeType, false)); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java index c96c12c9ff3..c8c087320f4 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java @@ -98,7 +98,8 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc // update status of other related configs if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource())) { - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator); } else { sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java index 55290b948f5..79462f78db2 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java @@ -82,7 +82,8 @@ public ListenerResult listen(WorkflowContext context) { } // if the inlong group is dataSync mode, the stream source needs to be processed. - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { changeSource4DataSync(groupId, operateType, operator); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java index 4d285488f60..33ed7f86fdb 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java @@ -69,7 +69,6 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc InlongGroupInfo groupInfo = groupService.get(groupId); GroupResourceProcessForm processForm = new GroupResourceProcessForm(); processForm.setGroupInfo(groupInfo); - String username = context.getOperator(); List streamList = streamService.list(groupId); processForm.setStreamInfos(streamList); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java index f566c29728e..0ce551d1e63 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java @@ -115,8 +115,10 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc String operator = context.getOperator(); GroupOperateType operateType = groupProcessForm.getGroupOperateType(); - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { - log.warn("skip to execute QueueResourceListener as sync mode for groupId={}", groupId); + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { + log.warn("skip to execute QueueResourceListener as sync mode {} (1 for realtime sync, 2 for offline sync) " + + "for groupId={}", groupInfo.getInlongGroupMode(), groupId); if (GroupOperateType.INIT.equals(operateType)) { this.createQueueForStreams(groupInfo, groupProcessForm.getStreamInfos(), operator); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java index df41823b84a..85033b44f71 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java @@ -66,7 +66,8 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc // Update status of other related configs streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator); streamService.updateWithoutCheck(streamInfo.genRequest(), operator); - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(form.getGroupInfo().getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(form.getGroupInfo().getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(form.getGroupInfo().getInlongGroupMode())) { sourceService.updateStatus(groupId, streamId, SourceStatus.SOURCE_NORMAL.getCode(), operator); } else { sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index 0fd1800ea49..0aacec11fbb 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -138,6 +138,7 @@ public void updateOpt(SourceRequest request, Integer groupStatus, Integer groupM return; } boolean allowUpdate = InlongConstants.DATASYNC_REALTIME_MODE.equals(groupMode) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupMode) || SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus()); if (!allowUpdate) { throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED, diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index 08015a40862..c3be04be851 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -222,7 +222,8 @@ public Map> getSourcesMap(InlongGroupInfo groupInfo, // if the group mode is DATASYNC, just get all related stream sources List streamSources = this.listSource(groupId, null); - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { result = streamSources.stream() .collect(Collectors.groupingBy(StreamSource::getInlongStreamId, HashMap::new, Collectors.toCollection(ArrayList::new)));