diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java index 1dbbde4ed0b6..9b434423c4f2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeDataNodeSubtask.java @@ -41,15 +41,29 @@ protected PipeDataNodeSubtask(String taskID, long creationTime) { @Override public synchronized void onFailure(@NotNull Throwable throwable) { if (isClosed.get()) { - LOGGER.info("onFailure in pipe subtask, ignored because pipe is dropped."); + LOGGER.info("onFailure in pipe subtask, ignored because pipe is dropped.", throwable); releaseLastEvent(false); return; } + if (lastEvent instanceof EnrichedEvent) { + onEnrichedEventFailure(throwable); + } else { + onNonEnrichedEventFailure(throwable); + } + + // Although the pipe task will be stopped, we still don't release the last event here + // Because we need to keep it for the next retry. If user wants to restart the task, + // the last event will be processed again. The last event will be released when the task + // is dropped or the process is running normally. + } + + private void onEnrichedEventFailure(@NotNull Throwable throwable) { if (retryCount.get() == 0) { LOGGER.warn( - "Failed to execute subtask {}({}), because of {}. Will retry for {} times.", + "Failed to execute subtask {} (creation time: {}, simple class: {}), because of {}. Will retry for {} times.", taskID, + creationTime, this.getClass().getSimpleName(), throwable.getMessage(), MAX_RETRY_TIMES, @@ -59,8 +73,9 @@ public synchronized void onFailure(@NotNull Throwable throwable) { if (retryCount.get() < MAX_RETRY_TIMES) { retryCount.incrementAndGet(); LOGGER.warn( - "Retry executing subtask {}({}), retry count [{}/{}]", + "Retry executing subtask {} (creation time: {}, simple class: {}), retry count [{}/{}]", taskID, + creationTime, this.getClass().getSimpleName(), retryCount.get(), MAX_RETRY_TIMES); @@ -68,9 +83,11 @@ public synchronized void onFailure(@NotNull Throwable throwable) { Thread.sleep(1000L * retryCount.get()); } catch (InterruptedException e) { LOGGER.warn( - "Interrupted when retrying to execute subtask {}({})", + "Interrupted when retrying to execute subtask {} (creation time: {}, simple class: {})", taskID, - this.getClass().getSimpleName()); + creationTime, + this.getClass().getSimpleName(), + e); Thread.currentThread().interrupt(); } @@ -78,47 +95,63 @@ public synchronized void onFailure(@NotNull Throwable throwable) { } else { final String errorMessage = String.format( - "Failed to execute subtask %s(%s), " + "Failed to execute subtask %s (creation time: %s, simple class: %s), " + "retry count exceeds the max retry times %d, last exception: %s, root cause: %s", taskID, + creationTime, this.getClass().getSimpleName(), retryCount.get(), throwable.getMessage(), ErrorHandlingUtils.getRootCause(throwable).getMessage()); LOGGER.warn(errorMessage, throwable); + ((EnrichedEvent) lastEvent) + .reportException( + throwable instanceof PipeRuntimeException + ? (PipeRuntimeException) throwable + : new PipeRuntimeCriticalException(errorMessage)); + LOGGER.warn( + "The last event is an instance of EnrichedEvent, so the exception is reported. " + + "Stopping current pipe subtask {} (creation time: {}, simple class: {}) locally... " + + "Status shown when query the pipe will be 'STOPPED'. " + + "Please restart the task by executing 'START PIPE' manually if needed.", + taskID, + creationTime, + this.getClass().getSimpleName(), + throwable); + } + } - if (lastEvent instanceof EnrichedEvent) { - ((EnrichedEvent) lastEvent) - .reportException( - throwable instanceof PipeRuntimeException - ? (PipeRuntimeException) throwable - : new PipeRuntimeCriticalException(errorMessage)); - LOGGER.warn( - "The last event is an instance of EnrichedEvent, so the exception is reported. " - + "Stopping current pipe task {}({}) locally... " - + "Status shown when query the pipe will be 'STOPPED'. " - + "Please restart the task by executing 'START PIPE' manually if needed.", - taskID, - this.getClass().getSimpleName(), - throwable); - } else { - LOGGER.error( - "The last event is not an instance of EnrichedEvent, " - + "so the exception cannot be reported. " - + "Stopping current pipe task {}({}) locally... " - + "Status shown when query the pipe will be 'RUNNING' " - + "instead of 'STOPPED', but the task is actually stopped. " - + "Please restart the task by executing 'START PIPE' manually if needed.", - taskID, - this.getClass().getSimpleName(), - throwable); - } + private void onNonEnrichedEventFailure(@NotNull Throwable throwable) { + if (retryCount.get() == 0) { + LOGGER.warn( + "Failed to execute subtask {} (creation time: {}, simple class: {}), " + + "because of {}. Will retry forever.", + taskID, + creationTime, + this.getClass().getSimpleName(), + throwable.getMessage(), + throwable); + } - // Although the pipe task will be stopped, we still don't release the last event here - // Because we need to keep it for the next retry. If user wants to restart the task, - // the last event will be processed again. The last event will be released when the task - // is dropped or the process is running normally. + retryCount.incrementAndGet(); + LOGGER.warn( + "Retry executing subtask {} (creation time: {}, simple class: {}), retry count {}", + taskID, + creationTime, + this.getClass().getSimpleName(), + retryCount.get()); + try { + Thread.sleep(Math.min(1000L * retryCount.get(), 10000)); + } catch (InterruptedException e) { + LOGGER.warn( + "Interrupted when retrying to execute subtask {} (creation time: {}, simple class: {})", + taskID, + creationTime, + this.getClass().getSimpleName()); + Thread.currentThread().interrupt(); } + + submitSelf(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java index 184c706be1c3..061944af1a08 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java @@ -200,91 +200,91 @@ public synchronized void onFailure(@NotNull Throwable throwable) { isSubmitted = false; if (isClosed.get()) { - LOGGER.info("onFailure in pipe transfer, ignored because pipe is dropped."); + LOGGER.info("onFailure in pipe transfer, ignored because pipe is dropped.", throwable); releaseLastEvent(false); return; } - // Retry to connect to the target system if the connection is broken if (throwable instanceof PipeConnectionException) { - LOGGER.warn( - "PipeConnectionException occurred, retrying to connect to the target system...", - throwable); + // Retry to connect to the target system if the connection is broken + if (onPipeConnectionException(throwable)) { + // return if the pipe task should be stopped + return; + } + } - int retry = 0; - while (retry < MAX_RETRY_TIMES) { + // Handle other exceptions as usual + super.onFailure(throwable); + } + + /** @return true if the pipe task should be stopped, false otherwise */ + private boolean onPipeConnectionException(Throwable throwable) { + LOGGER.warn( + "PipeConnectionException occurred, {} retries to handshake with the target system.", + outputPipeConnector.getClass().getName(), + throwable); + + int retry = 0; + while (retry < MAX_RETRY_TIMES) { + try { + outputPipeConnector.handshake(); + LOGGER.info( + "{} handshakes with the target system successfully.", + outputPipeConnector.getClass().getName()); + break; + } catch (Exception e) { + retry++; + LOGGER.warn( + "{} failed to handshake with the target system for {} times, " + + "will retry at most {} times.", + outputPipeConnector.getClass().getName(), + retry, + MAX_RETRY_TIMES, + e); try { - outputPipeConnector.handshake(); - LOGGER.info("Successfully reconnected to the target system."); - break; - } catch (Exception e) { - retry++; - LOGGER.warn( - "Failed to reconnect to the target system, retrying ... " - + "after [{}/{}] time(s) retries.", - retry, - MAX_RETRY_TIMES, - e); - try { - Thread.sleep(retry * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs()); - } catch (InterruptedException interruptedException) { - LOGGER.info( - "Interrupted while sleeping, perhaps need to check " - + "whether the thread is interrupted.", - interruptedException); - Thread.currentThread().interrupt(); - } + Thread.sleep(retry * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs()); + } catch (InterruptedException interruptedException) { + LOGGER.info( + "Interrupted while sleeping, will retry to handshake with the target system.", + interruptedException); + Thread.currentThread().interrupt(); } } + } - // Stop current pipe task if failed to reconnect to the target system after MAX_RETRY_TIMES - // times - if (retry == MAX_RETRY_TIMES) { - if (lastEvent instanceof EnrichedEvent) { - LOGGER.warn( - "Failed to reconnect to the target system after {} times, " - + "stopping current pipe task {}... " - + "Status shown when query the pipe will be 'STOPPED'. " - + "Please restart the task by executing 'START PIPE' manually if needed.", - MAX_RETRY_TIMES, - taskID, - throwable); - - ((EnrichedEvent) lastEvent) - .reportException( - new PipeRuntimeConnectorCriticalException( - throwable.getMessage() - + ", root cause: " - + ErrorHandlingUtils.getRootCause(throwable).getMessage())); - } else { - LOGGER.error( - "Failed to reconnect to the target system after {} times, " - + "stopping current pipe task {} locally... " - + "Status shown when query the pipe will be 'RUNNING' instead of 'STOPPED', " - + "but the task is actually stopped. " - + "Please restart the task by executing 'START PIPE' manually if needed.", - MAX_RETRY_TIMES, - taskID, - throwable); - } - - // Although the pipe task will be stopped, we still don't release the last event here - // Because we need to keep it for the next retry. If user wants to restart the task, - // the last event will be processed again. The last event will be released when the task - // is dropped or the process is running normally. - - // Stop current pipe task if failed to reconnect to the target system after MAX_RETRY_TIMES - return; - } - } else { + // Stop current pipe task if failed to reconnect to + // the target system after MAX_RETRY_TIMES times + if (retry == MAX_RETRY_TIMES && lastEvent instanceof EnrichedEvent) { + ((EnrichedEvent) lastEvent) + .reportException( + new PipeRuntimeConnectorCriticalException( + throwable.getMessage() + + ", root cause: " + + ErrorHandlingUtils.getRootCause(throwable).getMessage())); LOGGER.warn( - "A non-PipeConnectionException occurred, exception message: {}", - throwable.getMessage(), + "{} failed to handshake with the target system after {} times, " + + "stopping current subtask {} (creation time: {}, simple class: {}). " + + "Status shown when query the pipe will be 'STOPPED'. " + + "Please restart the task by executing 'START PIPE' manually if needed.", + outputPipeConnector.getClass().getName(), + MAX_RETRY_TIMES, + taskID, + creationTime, + this.getClass().getSimpleName(), throwable); + + // Although the pipe task will be stopped, we still don't release the last event here + // Because we need to keep it for the next retry. If user wants to restart the task, + // the last event will be processed again. The last event will be released when the task + // is dropped or the process is running normally. + + // Stop current pipe task if failed to reconnect to the target system after MAX_RETRY_TIMES + return true; } - // Handle other exceptions as usual - super.onFailure(new PipeRuntimeConnectorCriticalException(throwable.getMessage())); + // For non enriched event, forever retry. + // For enriched event, retry if connection is set up successfully. + return false; } /**