Skip to content

Commit

Permalink
Pipe: stop pipe using restarting strategy to unpin the wal's referenc…
Browse files Browse the repository at this point in the history
…e count to avoid WAL stacking (apache#11971)
  • Loading branch information
MiniSho authored Jan 30, 2024
1 parent 0ad17bf commit d0928eb
Showing 1 changed file with 13 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -555,34 +555,29 @@ private void startPipe(String pipeName, long creationTime) {
}

protected void stopPipe(String pipeName, long creationTime) {
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);

if (!checkBeforeStopPipe(existedPipeMeta, pipeName, creationTime)) {
return;
}
final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);

// Get pipe tasks
final Map<TConsensusGroupId, PipeTask> pipeTasks =
pipeTaskManager.getPipeTasks(existedPipeMeta.getStaticMeta());
if (pipeTasks == null) {
if (!checkBeforeStopPipe(pipeMeta, pipeName, creationTime)) {
LOGGER.info(
"Pipe {} (creation time = {}) has already been dropped or has not been created. "
+ "Skip stopping.",
pipeName,
creationTime);
"Stop Pipe: Pipe {} has already been dropped or has not been created. Skip stopping.",
pipeName);
return;
}

// Trigger stop() method for each pipe task by parallel stream
// 1. Drop the pipe task
final long startTime = System.currentTimeMillis();
pipeTasks.values().parallelStream().forEach(PipeTask::stop);
handleDropPipeInternal(pipeMeta.getStaticMeta().getPipeName());

// 2. Set pipe meta status to STOPPED
pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);

// 3. create a new pipe with the same pipeMeta
createPipe(pipeMeta);

LOGGER.info(
"Stop all pipe tasks on Pipe {} successfully within {} ms",
pipeName,
System.currentTimeMillis() - startTime);

// Set pipe meta status to STOPPED
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
}

////////////////////////// Checker //////////////////////////
Expand Down

0 comments on commit d0928eb

Please sign in to comment.