From 8f0339790ef5308c7100c0e4b711ff191725c38f Mon Sep 17 00:00:00 2001 From: gunli <24350715@qq.com> Date: Sun, 21 Jul 2024 23:05:54 +0800 Subject: [PATCH] [INLONG-10675][SDK] Use exponential backoff instead of linear backoff retrying in Golang SDK (#10678) Co-authored-by: gunli --- .../dataproxy-sdk-golang/dataproxy/worker.go | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go index c67942ccde7..7eb076e4dbe 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go @@ -32,6 +32,7 @@ import ( "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/bufferpool" "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/logger" "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/syncx" + "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util" ) const ( @@ -378,6 +379,12 @@ func (w *worker) handleSendData(req *sendDataReq) { } func (w *worker) sendBatch(b *batchReq, retryOnFail bool) { + // check if we have exceeded the max retry + if b.retries > w.options.MaxRetries { + b.done(errSendTimeout) + return + } + b.lastSendTime = time.Now() b.encode() @@ -470,6 +477,7 @@ func (w *worker) backoffRetry(ctx context.Context, batch *batchReq) { return } + batch.retries++ go func() { defer func() { if rec := recover(); rec != nil { @@ -479,22 +487,18 @@ func (w *worker) backoffRetry(ctx context.Context, batch *batchReq) { } }() - minBackoff := 100 * time.Millisecond - maxBackoff := 10 * time.Second - jitterPercent := 0.2 - - backoff := time.Duration(batch.retries+1) * minBackoff - if backoff > maxBackoff { - backoff = maxBackoff + // use ExponentialBackoff + backoff := util.ExponentialBackoff{ + InitialInterval: 100 * time.Millisecond, + MaxInterval: 10 * time.Second, + Multiplier: 2.0, + Randomization: 0.2, } - // rand will be panic in concurrent call, so we create a new one at each call - jitterRand := rand.New(rand.NewSource(time.Now().UnixNano())) - jitter := jitterRand.Float64() * float64(backoff) * jitterPercent - backoff += time.Duration(jitter) + waitTime := backoff.Next(batch.retries) select { - case <-time.After(backoff): + case <-time.After(waitTime): // check if the worker is closed again if w.getState() == stateClosed { batch.done(errSendTimeout) @@ -512,12 +516,6 @@ func (w *worker) backoffRetry(ctx context.Context, batch *batchReq) { } func (w *worker) handleRetry(batch *batchReq, retryOnFail bool) { - batch.retries++ - if batch.retries >= w.options.MaxRetries { - batch.done(errSendTimeout) - return - } - // retry w.log.Debug("retry batch...", ", workerID:", w.index, ", batchID:", batch.batchID) w.metrics.incRetry(w.indexStr)