Skip to content

Commit

Permalink
[INLONG-10675][SDK] Use exponential backoff instead of linear backoff…
Browse files Browse the repository at this point in the history
… retrying in Golang SDK (apache#10678)

Co-authored-by: gunli <gunli@tencent.com>
  • Loading branch information
gunli and gunli authored Jul 21, 2024
1 parent 4db1daf commit 8f03397
Showing 1 changed file with 16 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/bufferpool"
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/logger"
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/syncx"
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util"
)

const (
Expand Down Expand Up @@ -378,6 +379,12 @@ func (w *worker) handleSendData(req *sendDataReq) {
}

func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
// check if we have exceeded the max retry
if b.retries > w.options.MaxRetries {
b.done(errSendTimeout)
return
}

b.lastSendTime = time.Now()
b.encode()

Expand Down Expand Up @@ -470,6 +477,7 @@ func (w *worker) backoffRetry(ctx context.Context, batch *batchReq) {
return
}

batch.retries++
go func() {
defer func() {
if rec := recover(); rec != nil {
Expand All @@ -479,22 +487,18 @@ func (w *worker) backoffRetry(ctx context.Context, batch *batchReq) {
}
}()

minBackoff := 100 * time.Millisecond
maxBackoff := 10 * time.Second
jitterPercent := 0.2

backoff := time.Duration(batch.retries+1) * minBackoff
if backoff > maxBackoff {
backoff = maxBackoff
// use ExponentialBackoff
backoff := util.ExponentialBackoff{
InitialInterval: 100 * time.Millisecond,
MaxInterval: 10 * time.Second,
Multiplier: 2.0,
Randomization: 0.2,
}

// rand will be panic in concurrent call, so we create a new one at each call
jitterRand := rand.New(rand.NewSource(time.Now().UnixNano()))
jitter := jitterRand.Float64() * float64(backoff) * jitterPercent
backoff += time.Duration(jitter)
waitTime := backoff.Next(batch.retries)

select {
case <-time.After(backoff):
case <-time.After(waitTime):
// check if the worker is closed again
if w.getState() == stateClosed {
batch.done(errSendTimeout)
Expand All @@ -512,12 +516,6 @@ func (w *worker) backoffRetry(ctx context.Context, batch *batchReq) {
}

func (w *worker) handleRetry(batch *batchReq, retryOnFail bool) {
batch.retries++
if batch.retries >= w.options.MaxRetries {
batch.done(errSendTimeout)
return
}

// retry
w.log.Debug("retry batch...", ", workerID:", w.index, ", batchID:", batch.batchID)
w.metrics.incRetry(w.indexStr)
Expand Down

0 comments on commit 8f03397

Please sign in to comment.