diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index e4b784ceeaea..d4c18a362135 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -555,29 +555,34 @@ private void startPipe(String pipeName, long creationTime) { } protected void stopPipe(String pipeName, long creationTime) { - final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); + final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); + + if (!checkBeforeStopPipe(existedPipeMeta, pipeName, creationTime)) { + return; + } - if (!checkBeforeStopPipe(pipeMeta, pipeName, creationTime)) { + // Get pipe tasks + final Map pipeTasks = + pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta()); + if (pipeTasks == null) { LOGGER.info( - "Stop Pipe: Pipe {} has already been dropped or has not been created. Skip stopping.", - pipeName); + "Pipe {} (creation time = {}) has already been dropped or has not been created. " + + "Skip stopping.", + pipeName, + creationTime); return; } - // 1. Drop the pipe task + // Trigger stop() method for each pipe task by parallel stream final long startTime = System.currentTimeMillis(); - handleDropPipeInternal(pipeMeta.getStaticMeta().getPipeName()); - - // 2. Set pipe meta status to STOPPED - pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED); - - // 3. create a new pipe with the same pipeMeta - createPipe(pipeMeta); - + pipeTasks.values().parallelStream().forEach(PipeTask::stop); LOGGER.info( "Stop all pipe tasks on Pipe {} successfully within {} ms", pipeName, System.currentTimeMillis() - startTime); + + // Set pipe meta status to STOPPED + existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED); } ////////////////////////// Checker //////////////////////////