Skip to content

Commit

Permalink
Pipe: let non enriched event forever retry when exception occurred to…
Browse files Browse the repository at this point in the history
… avoid subtask fake running status recorded at data node task agent (apache#11929)
  • Loading branch information
SteveYurongSu authored Jan 19, 2024
1 parent 4cc5b2b commit 771c096
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,29 @@ protected PipeDataNodeSubtask(String taskID, long creationTime) {
@Override
public synchronized void onFailure(@NotNull Throwable throwable) {
if (isClosed.get()) {
LOGGER.info("onFailure in pipe subtask, ignored because pipe is dropped.");
LOGGER.info("onFailure in pipe subtask, ignored because pipe is dropped.", throwable);
releaseLastEvent(false);
return;
}

if (lastEvent instanceof EnrichedEvent) {
onEnrichedEventFailure(throwable);
} else {
onNonEnrichedEventFailure(throwable);
}

// Although the pipe task will be stopped, we still don't release the last event here
// Because we need to keep it for the next retry. If user wants to restart the task,
// the last event will be processed again. The last event will be released when the task
// is dropped or the process is running normally.
}

private void onEnrichedEventFailure(@NotNull Throwable throwable) {
if (retryCount.get() == 0) {
LOGGER.warn(
"Failed to execute subtask {}({}), because of {}. Will retry for {} times.",
"Failed to execute subtask {} (creation time: {}, simple class: {}), because of {}. Will retry for {} times.",
taskID,
creationTime,
this.getClass().getSimpleName(),
throwable.getMessage(),
MAX_RETRY_TIMES,
Expand All @@ -59,66 +73,85 @@ public synchronized void onFailure(@NotNull Throwable throwable) {
if (retryCount.get() < MAX_RETRY_TIMES) {
retryCount.incrementAndGet();
LOGGER.warn(
"Retry executing subtask {}({}), retry count [{}/{}]",
"Retry executing subtask {} (creation time: {}, simple class: {}), retry count [{}/{}]",
taskID,
creationTime,
this.getClass().getSimpleName(),
retryCount.get(),
MAX_RETRY_TIMES);
try {
Thread.sleep(1000L * retryCount.get());
} catch (InterruptedException e) {
LOGGER.warn(
"Interrupted when retrying to execute subtask {}({})",
"Interrupted when retrying to execute subtask {} (creation time: {}, simple class: {})",
taskID,
this.getClass().getSimpleName());
creationTime,
this.getClass().getSimpleName(),
e);
Thread.currentThread().interrupt();
}

submitSelf();
} else {
final String errorMessage =
String.format(
"Failed to execute subtask %s(%s), "
"Failed to execute subtask %s (creation time: %s, simple class: %s), "
+ "retry count exceeds the max retry times %d, last exception: %s, root cause: %s",
taskID,
creationTime,
this.getClass().getSimpleName(),
retryCount.get(),
throwable.getMessage(),
ErrorHandlingUtils.getRootCause(throwable).getMessage());
LOGGER.warn(errorMessage, throwable);
((EnrichedEvent) lastEvent)
.reportException(
throwable instanceof PipeRuntimeException
? (PipeRuntimeException) throwable
: new PipeRuntimeCriticalException(errorMessage));
LOGGER.warn(
"The last event is an instance of EnrichedEvent, so the exception is reported. "
+ "Stopping current pipe subtask {} (creation time: {}, simple class: {}) locally... "
+ "Status shown when query the pipe will be 'STOPPED'. "
+ "Please restart the task by executing 'START PIPE' manually if needed.",
taskID,
creationTime,
this.getClass().getSimpleName(),
throwable);
}
}

if (lastEvent instanceof EnrichedEvent) {
((EnrichedEvent) lastEvent)
.reportException(
throwable instanceof PipeRuntimeException
? (PipeRuntimeException) throwable
: new PipeRuntimeCriticalException(errorMessage));
LOGGER.warn(
"The last event is an instance of EnrichedEvent, so the exception is reported. "
+ "Stopping current pipe task {}({}) locally... "
+ "Status shown when query the pipe will be 'STOPPED'. "
+ "Please restart the task by executing 'START PIPE' manually if needed.",
taskID,
this.getClass().getSimpleName(),
throwable);
} else {
LOGGER.error(
"The last event is not an instance of EnrichedEvent, "
+ "so the exception cannot be reported. "
+ "Stopping current pipe task {}({}) locally... "
+ "Status shown when query the pipe will be 'RUNNING' "
+ "instead of 'STOPPED', but the task is actually stopped. "
+ "Please restart the task by executing 'START PIPE' manually if needed.",
taskID,
this.getClass().getSimpleName(),
throwable);
}
private void onNonEnrichedEventFailure(@NotNull Throwable throwable) {
if (retryCount.get() == 0) {
LOGGER.warn(
"Failed to execute subtask {} (creation time: {}, simple class: {}), "
+ "because of {}. Will retry forever.",
taskID,
creationTime,
this.getClass().getSimpleName(),
throwable.getMessage(),
throwable);
}

// Although the pipe task will be stopped, we still don't release the last event here
// Because we need to keep it for the next retry. If user wants to restart the task,
// the last event will be processed again. The last event will be released when the task
// is dropped or the process is running normally.
retryCount.incrementAndGet();
LOGGER.warn(
"Retry executing subtask {} (creation time: {}, simple class: {}), retry count {}",
taskID,
creationTime,
this.getClass().getSimpleName(),
retryCount.get());
try {
Thread.sleep(Math.min(1000L * retryCount.get(), 10000));
} catch (InterruptedException e) {
LOGGER.warn(
"Interrupted when retrying to execute subtask {} (creation time: {}, simple class: {})",
taskID,
creationTime,
this.getClass().getSimpleName());
Thread.currentThread().interrupt();
}

submitSelf();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,91 +200,91 @@ public synchronized void onFailure(@NotNull Throwable throwable) {
isSubmitted = false;

if (isClosed.get()) {
LOGGER.info("onFailure in pipe transfer, ignored because pipe is dropped.");
LOGGER.info("onFailure in pipe transfer, ignored because pipe is dropped.", throwable);
releaseLastEvent(false);
return;
}

// Retry to connect to the target system if the connection is broken
if (throwable instanceof PipeConnectionException) {
LOGGER.warn(
"PipeConnectionException occurred, retrying to connect to the target system...",
throwable);
// Retry to connect to the target system if the connection is broken
if (onPipeConnectionException(throwable)) {
// return if the pipe task should be stopped
return;
}
}

int retry = 0;
while (retry < MAX_RETRY_TIMES) {
// Handle other exceptions as usual
super.onFailure(throwable);
}

/** @return true if the pipe task should be stopped, false otherwise */
private boolean onPipeConnectionException(Throwable throwable) {
LOGGER.warn(
"PipeConnectionException occurred, {} retries to handshake with the target system.",
outputPipeConnector.getClass().getName(),
throwable);

int retry = 0;
while (retry < MAX_RETRY_TIMES) {
try {
outputPipeConnector.handshake();
LOGGER.info(
"{} handshakes with the target system successfully.",
outputPipeConnector.getClass().getName());
break;
} catch (Exception e) {
retry++;
LOGGER.warn(
"{} failed to handshake with the target system for {} times, "
+ "will retry at most {} times.",
outputPipeConnector.getClass().getName(),
retry,
MAX_RETRY_TIMES,
e);
try {
outputPipeConnector.handshake();
LOGGER.info("Successfully reconnected to the target system.");
break;
} catch (Exception e) {
retry++;
LOGGER.warn(
"Failed to reconnect to the target system, retrying ... "
+ "after [{}/{}] time(s) retries.",
retry,
MAX_RETRY_TIMES,
e);
try {
Thread.sleep(retry * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
} catch (InterruptedException interruptedException) {
LOGGER.info(
"Interrupted while sleeping, perhaps need to check "
+ "whether the thread is interrupted.",
interruptedException);
Thread.currentThread().interrupt();
}
Thread.sleep(retry * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
} catch (InterruptedException interruptedException) {
LOGGER.info(
"Interrupted while sleeping, will retry to handshake with the target system.",
interruptedException);
Thread.currentThread().interrupt();
}
}
}

// Stop current pipe task if failed to reconnect to the target system after MAX_RETRY_TIMES
// times
if (retry == MAX_RETRY_TIMES) {
if (lastEvent instanceof EnrichedEvent) {
LOGGER.warn(
"Failed to reconnect to the target system after {} times, "
+ "stopping current pipe task {}... "
+ "Status shown when query the pipe will be 'STOPPED'. "
+ "Please restart the task by executing 'START PIPE' manually if needed.",
MAX_RETRY_TIMES,
taskID,
throwable);

((EnrichedEvent) lastEvent)
.reportException(
new PipeRuntimeConnectorCriticalException(
throwable.getMessage()
+ ", root cause: "
+ ErrorHandlingUtils.getRootCause(throwable).getMessage()));
} else {
LOGGER.error(
"Failed to reconnect to the target system after {} times, "
+ "stopping current pipe task {} locally... "
+ "Status shown when query the pipe will be 'RUNNING' instead of 'STOPPED', "
+ "but the task is actually stopped. "
+ "Please restart the task by executing 'START PIPE' manually if needed.",
MAX_RETRY_TIMES,
taskID,
throwable);
}

// Although the pipe task will be stopped, we still don't release the last event here
// Because we need to keep it for the next retry. If user wants to restart the task,
// the last event will be processed again. The last event will be released when the task
// is dropped or the process is running normally.

// Stop current pipe task if failed to reconnect to the target system after MAX_RETRY_TIMES
return;
}
} else {
// Stop current pipe task if failed to reconnect to
// the target system after MAX_RETRY_TIMES times
if (retry == MAX_RETRY_TIMES && lastEvent instanceof EnrichedEvent) {
((EnrichedEvent) lastEvent)
.reportException(
new PipeRuntimeConnectorCriticalException(
throwable.getMessage()
+ ", root cause: "
+ ErrorHandlingUtils.getRootCause(throwable).getMessage()));
LOGGER.warn(
"A non-PipeConnectionException occurred, exception message: {}",
throwable.getMessage(),
"{} failed to handshake with the target system after {} times, "
+ "stopping current subtask {} (creation time: {}, simple class: {}). "
+ "Status shown when query the pipe will be 'STOPPED'. "
+ "Please restart the task by executing 'START PIPE' manually if needed.",
outputPipeConnector.getClass().getName(),
MAX_RETRY_TIMES,
taskID,
creationTime,
this.getClass().getSimpleName(),
throwable);

// Although the pipe task will be stopped, we still don't release the last event here
// Because we need to keep it for the next retry. If user wants to restart the task,
// the last event will be processed again. The last event will be released when the task
// is dropped or the process is running normally.

// Stop current pipe task if failed to reconnect to the target system after MAX_RETRY_TIMES
return true;
}

// Handle other exceptions as usual
super.onFailure(new PipeRuntimeConnectorCriticalException(throwable.getMessage()));
// For non enriched event, forever retry.
// For enriched event, retry if connection is set up successfully.
return false;
}

/**
Expand Down

0 comments on commit 771c096

Please sign in to comment.