Skip to content

Commit

Permalink
feat: Change one thread per retry to use a thread pool (#1898)
Browse files Browse the repository at this point in the history
* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

* feat: some fixes for multiplexing client

* feat: fix some todos, and reject the mixed behavior of passed in client or not

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* feat: add getInflightWaitSeconds implementation

* feat: Add schema comparision in connection loop to ensure schema update for
the same stream name can be notified

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: add schema update support to multiplexing

* fix: fix windows build bug: windows Instant resolution is different with
linux

* fix: fix another failing tests for windows build

* fix: fix another test failure for Windows build

* feat: Change new thread for each retry to be a thread pool to avoid
create/tear down too much threads if lots of retries happens

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] authored Nov 30, 2022
1 parent 8e90767 commit 44a4e4d
Showing 1 changed file with 24 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import io.grpc.Status.Code;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
Expand Down Expand Up @@ -193,6 +195,8 @@ static class AppendCompleteCallback implements ApiFutureCallback<AppendRowsRespo

private final DataWriter parent;
private final AppendContext appendContext;
// Prepare a thread pool
static ExecutorService pool = Executors.newFixedThreadPool(50);

public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) {
this.parent = parent;
Expand All @@ -213,19 +217,18 @@ public void onFailure(Throwable throwable) {
&& RETRIABLE_ERROR_CODES.contains(status.getCode())) {
appendContext.retryCount++;
// Use a separate thread to avoid potentially blocking while we are in a callback.
new Thread(
() -> {
try {
// Since default stream appends are not ordered, we can simply retry the
// appends.
// Retrying with exclusive streams requires more careful consideration.
this.parent.append(appendContext);
} catch (Exception e) {
// Fall through to return error.
System.out.format("Failed to retry append: %s%n", e);
}
})
.start();
pool.submit(
() -> {
try {
// Since default stream appends are not ordered, we can simply retry the
// appends.
// Retrying with exclusive streams requires more careful consideration.
this.parent.append(appendContext);
} catch (Exception e) {
// Fall through to return error.
System.out.format("Failed to retry append: %s%n", e);
}
});
// Mark the existing attempt as done since it's being retried.
done();
return;
Expand All @@ -251,15 +254,14 @@ public void onFailure(Throwable throwable) {
// Retry the remaining valid rows, but using a separate thread to
// avoid potentially blocking while we are in a callback.
if (dataNew.length() > 0) {
new Thread(
() -> {
try {
this.parent.append(new AppendContext(dataNew, 0));
} catch (Exception e2) {
System.out.format("Failed to retry append with filtered rows: %s%n", e2);
}
})
.start();
pool.submit(
() -> {
try {
this.parent.append(new AppendContext(dataNew, 0));
} catch (Exception e2) {
System.out.format("Failed to retry append with filtered rows: %s%n", e2);
}
});
}
return;
}
Expand Down

0 comments on commit 44a4e4d

Please sign in to comment.