Skip to content

Commit

Permalink
Pipe: fix connector subtasks can not be stopped and restarted after e…
Browse files Browse the repository at this point in the history
…xception reporting by connector subtasks (apache#11979)
  • Loading branch information
Caideyipi authored Jan 26, 2024
1 parent ebbddf3 commit 40fc15b
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,18 @@ public synchronized void onFailure(@NotNull Throwable throwable) {

if (throwable instanceof PipeConnectionException) {
// Retry to connect to the target system if the connection is broken
// We should reconstruct the client before re-submit the subtask
if (onPipeConnectionException(throwable)) {
// return if the pipe task should be stopped
return;
}
}

// Handle other exceptions as usual
super.onFailure(throwable);
// Handle exceptions if any available clients exist
// Notice that the PipeRuntimeConnectorCriticalException must be thrown here
// because the upper layer relies on this to stop all the related pipe tasks
// Other exceptions may cause the subtask to stop forever and can not be restarted
super.onFailure(new PipeRuntimeConnectorCriticalException(throwable.getMessage()));
}

/** @return true if the pipe task should be stopped, false otherwise */
Expand Down Expand Up @@ -252,7 +256,7 @@ private boolean onPipeConnectionException(Throwable throwable) {
}
}

// Stop current pipe task if failed to reconnect to
// Stop current pipe task directly if failed to reconnect to
// the target system after MAX_RETRY_TIMES times
if (retry == MAX_RETRY_TIMES && lastEvent instanceof EnrichedEvent) {
((EnrichedEvent) lastEvent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
private final BoundedBlockingPendingQueue<Event> pendingQueue;

private int runningTaskCount;
private int aliveTaskCount;
private int registeredTaskCount;

public PipeConnectorSubtaskLifeCycle(
PipeConnectorSubtaskExecutor executor,
Expand All @@ -46,7 +46,7 @@ public PipeConnectorSubtaskLifeCycle(
this.pendingQueue = pendingQueue;

runningTaskCount = 0;
aliveTaskCount = 0;
registeredTaskCount = 0;
}

public PipeConnectorSubtask getSubtask() {
Expand All @@ -58,57 +58,57 @@ public BoundedBlockingPendingQueue<Event> getPendingQueue() {
}

public synchronized void register() {
if (aliveTaskCount < 0) {
throw new IllegalStateException("aliveTaskCount < 0");
if (registeredTaskCount < 0) {
throw new IllegalStateException("registeredTaskCount < 0");
}

if (aliveTaskCount == 0) {
if (registeredTaskCount == 0) {
executor.register(subtask);
runningTaskCount = 0;
}

aliveTaskCount++;
registeredTaskCount++;
LOGGER.info(
"Register subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
"Register subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
subtask,
runningTaskCount,
aliveTaskCount);
registeredTaskCount);
}

/**
* Deregister the subtask. If the subtask is the last one, close the subtask.
*
* <p>Note that this method should be called after the subtask is stopped. Otherwise, the
* runningTaskCount might be inconsistent with the aliveTaskCount because of parallel connector
* scheduling.
* runningTaskCount might be inconsistent with the registeredTaskCount because of parallel
* connector scheduling.
*
* @param pipeNameToDeregister pipe name
* @return true if the subtask is out of life cycle, indicating that the subtask should never be
* used again
* @throws IllegalStateException if aliveTaskCount <= 0
* @throws IllegalStateException if registeredTaskCount <= 0
*/
public synchronized boolean deregister(String pipeNameToDeregister) {
if (aliveTaskCount <= 0) {
throw new IllegalStateException("aliveTaskCount <= 0");
if (registeredTaskCount <= 0) {
throw new IllegalStateException("registeredTaskCount <= 0");
}

subtask.discardEventsOfPipe(pipeNameToDeregister);

try {
if (aliveTaskCount > 1) {
if (registeredTaskCount > 1) {
return false;
}

close();
// This subtask is out of life cycle, should never be used again
return true;
} finally {
aliveTaskCount--;
registeredTaskCount--;
LOGGER.info(
"Deregister subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
"Deregister subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
subtask,
runningTaskCount,
aliveTaskCount);
registeredTaskCount);
}
}

Expand All @@ -123,10 +123,10 @@ public synchronized void start() {

runningTaskCount++;
LOGGER.info(
"Start subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
"Start subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
subtask,
runningTaskCount,
aliveTaskCount);
registeredTaskCount);
}

public synchronized void stop() {
Expand All @@ -140,10 +140,10 @@ public synchronized void stop() {

runningTaskCount--;
LOGGER.info(
"Stop subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
"Stop subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
subtask,
runningTaskCount,
aliveTaskCount);
registeredTaskCount);
}

@Override
Expand Down

0 comments on commit 40fc15b

Please sign in to comment.