diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java index 967ca2d6b86e..f472a07fa5f7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java @@ -207,14 +207,18 @@ public synchronized void onFailure(@NotNull Throwable throwable) { if (throwable instanceof PipeConnectionException) { // Retry to connect to the target system if the connection is broken + // We should reconstruct the client before re-submit the subtask if (onPipeConnectionException(throwable)) { // return if the pipe task should be stopped return; } } - // Handle other exceptions as usual - super.onFailure(throwable); + // Handle exceptions if any available clients exist + // Notice that the PipeRuntimeConnectorCriticalException must be thrown here + // because the upper layer relies on this to stop all the related pipe tasks + // Other exceptions may cause the subtask to stop forever and can not be restarted + super.onFailure(new PipeRuntimeConnectorCriticalException(throwable.getMessage())); } /** @return true if the pipe task should be stopped, false otherwise */ @@ -252,7 +256,7 @@ private boolean onPipeConnectionException(Throwable throwable) { } } - // Stop current pipe task if failed to reconnect to + // Stop current pipe task directly if failed to reconnect to // the target system after MAX_RETRY_TIMES times if (retry == MAX_RETRY_TIMES && lastEvent instanceof EnrichedEvent) { ((EnrichedEvent) lastEvent) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java index 6759dbdbaa21..6600fae8bc3a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java @@ -35,7 +35,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable { private final BoundedBlockingPendingQueue pendingQueue; private int runningTaskCount; - private int aliveTaskCount; + private int registeredTaskCount; public PipeConnectorSubtaskLifeCycle( PipeConnectorSubtaskExecutor executor, @@ -46,7 +46,7 @@ public PipeConnectorSubtaskLifeCycle( this.pendingQueue = pendingQueue; runningTaskCount = 0; - aliveTaskCount = 0; + registeredTaskCount = 0; } public PipeConnectorSubtask getSubtask() { @@ -58,44 +58,44 @@ public BoundedBlockingPendingQueue getPendingQueue() { } public synchronized void register() { - if (aliveTaskCount < 0) { - throw new IllegalStateException("aliveTaskCount < 0"); + if (registeredTaskCount < 0) { + throw new IllegalStateException("registeredTaskCount < 0"); } - if (aliveTaskCount == 0) { + if (registeredTaskCount == 0) { executor.register(subtask); runningTaskCount = 0; } - aliveTaskCount++; + registeredTaskCount++; LOGGER.info( - "Register subtask {}. runningTaskCount: {}, aliveTaskCount: {}", + "Register subtask {}. runningTaskCount: {}, registeredTaskCount: {}", subtask, runningTaskCount, - aliveTaskCount); + registeredTaskCount); } /** * Deregister the subtask. If the subtask is the last one, close the subtask. * *

Note that this method should be called after the subtask is stopped. Otherwise, the - * runningTaskCount might be inconsistent with the aliveTaskCount because of parallel connector - * scheduling. + * runningTaskCount might be inconsistent with the registeredTaskCount because of parallel + * connector scheduling. * * @param pipeNameToDeregister pipe name * @return true if the subtask is out of life cycle, indicating that the subtask should never be * used again - * @throws IllegalStateException if aliveTaskCount <= 0 + * @throws IllegalStateException if registeredTaskCount <= 0 */ public synchronized boolean deregister(String pipeNameToDeregister) { - if (aliveTaskCount <= 0) { - throw new IllegalStateException("aliveTaskCount <= 0"); + if (registeredTaskCount <= 0) { + throw new IllegalStateException("registeredTaskCount <= 0"); } subtask.discardEventsOfPipe(pipeNameToDeregister); try { - if (aliveTaskCount > 1) { + if (registeredTaskCount > 1) { return false; } @@ -103,12 +103,12 @@ public synchronized boolean deregister(String pipeNameToDeregister) { // This subtask is out of life cycle, should never be used again return true; } finally { - aliveTaskCount--; + registeredTaskCount--; LOGGER.info( - "Deregister subtask {}. runningTaskCount: {}, aliveTaskCount: {}", + "Deregister subtask {}. runningTaskCount: {}, registeredTaskCount: {}", subtask, runningTaskCount, - aliveTaskCount); + registeredTaskCount); } } @@ -123,10 +123,10 @@ public synchronized void start() { runningTaskCount++; LOGGER.info( - "Start subtask {}. runningTaskCount: {}, aliveTaskCount: {}", + "Start subtask {}. runningTaskCount: {}, registeredTaskCount: {}", subtask, runningTaskCount, - aliveTaskCount); + registeredTaskCount); } public synchronized void stop() { @@ -140,10 +140,10 @@ public synchronized void stop() { runningTaskCount--; LOGGER.info( - "Stop subtask {}. runningTaskCount: {}, aliveTaskCount: {}", + "Stop subtask {}. runningTaskCount: {}, registeredTaskCount: {}", subtask, runningTaskCount, - aliveTaskCount); + registeredTaskCount); } @Override