Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: fix connector subtasks can not be stopped and restarted after exception reporting by connector subtasks #11979

Merged
merged 7 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,18 @@ public synchronized void onFailure(@NotNull Throwable throwable) {

if (throwable instanceof PipeConnectionException) {
// Retry to connect to the target system if the connection is broken
// We should reconstruct the client before re-submit the subtask
if (onPipeConnectionException(throwable)) {
// return if the pipe task should be stopped
return;
}
}

// Handle other exceptions as usual
super.onFailure(throwable);
// Handle exceptions if any available clients exist
// Notice that the PipeRuntimeConnectorCriticalException must be thrown here
// because the upper layer relies on this to stop all the related pipe tasks
// Other exceptions may cause the subtask to stop forever and can not be restarted
super.onFailure(new PipeRuntimeConnectorCriticalException(throwable.getMessage()));
}

/** @return true if the pipe task should be stopped, false otherwise */
Expand Down Expand Up @@ -252,7 +256,7 @@ private boolean onPipeConnectionException(Throwable throwable) {
}
}

// Stop current pipe task if failed to reconnect to
// Stop current pipe task directly if failed to reconnect to
// the target system after MAX_RETRY_TIMES times
if (retry == MAX_RETRY_TIMES && lastEvent instanceof EnrichedEvent) {
((EnrichedEvent) lastEvent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
private final BoundedBlockingPendingQueue<Event> pendingQueue;

private int runningTaskCount;
private int aliveTaskCount;
private int registeredTaskCount;

public PipeConnectorSubtaskLifeCycle(
PipeConnectorSubtaskExecutor executor,
Expand All @@ -46,7 +46,7 @@ public PipeConnectorSubtaskLifeCycle(
this.pendingQueue = pendingQueue;

runningTaskCount = 0;
aliveTaskCount = 0;
registeredTaskCount = 0;
}

public PipeConnectorSubtask getSubtask() {
Expand All @@ -58,57 +58,57 @@ public BoundedBlockingPendingQueue<Event> getPendingQueue() {
}

public synchronized void register() {
if (aliveTaskCount < 0) {
throw new IllegalStateException("aliveTaskCount < 0");
if (registeredTaskCount < 0) {
throw new IllegalStateException("registeredTaskCount < 0");
}

if (aliveTaskCount == 0) {
if (registeredTaskCount == 0) {
executor.register(subtask);
runningTaskCount = 0;
}

aliveTaskCount++;
registeredTaskCount++;
LOGGER.info(
"Register subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
"Register subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
subtask,
runningTaskCount,
aliveTaskCount);
registeredTaskCount);
}

/**
* Deregister the subtask. If the subtask is the last one, close the subtask.
*
* <p>Note that this method should be called after the subtask is stopped. Otherwise, the
* runningTaskCount might be inconsistent with the aliveTaskCount because of parallel connector
* scheduling.
* runningTaskCount might be inconsistent with the registeredTaskCount because of parallel
* connector scheduling.
*
* @param pipeNameToDeregister pipe name
* @return true if the subtask is out of life cycle, indicating that the subtask should never be
* used again
* @throws IllegalStateException if aliveTaskCount <= 0
* @throws IllegalStateException if registeredTaskCount <= 0
*/
public synchronized boolean deregister(String pipeNameToDeregister) {
if (aliveTaskCount <= 0) {
throw new IllegalStateException("aliveTaskCount <= 0");
if (registeredTaskCount <= 0) {
throw new IllegalStateException("registeredTaskCount <= 0");
}

subtask.discardEventsOfPipe(pipeNameToDeregister);

try {
if (aliveTaskCount > 1) {
if (registeredTaskCount > 1) {
return false;
}

close();
// This subtask is out of life cycle, should never be used again
return true;
} finally {
aliveTaskCount--;
registeredTaskCount--;
LOGGER.info(
"Deregister subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
"Deregister subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
subtask,
runningTaskCount,
aliveTaskCount);
registeredTaskCount);
}
}

Expand All @@ -123,10 +123,10 @@ public synchronized void start() {

runningTaskCount++;
LOGGER.info(
"Start subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
"Start subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
subtask,
runningTaskCount,
aliveTaskCount);
registeredTaskCount);
}

public synchronized void stop() {
Expand All @@ -140,10 +140,10 @@ public synchronized void stop() {

runningTaskCount--;
LOGGER.info(
"Stop subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
"Stop subtask {}. runningTaskCount: {}, registeredTaskCount: {}",
subtask,
runningTaskCount,
aliveTaskCount);
registeredTaskCount);
}

@Override
Expand Down
Loading