From 1deb6eb5b8b7d0edca76e2c2fff30cd5e643c20f Mon Sep 17 00:00:00 2001 From: Congqi Xia Date: Fri, 21 Jul 2023 11:28:18 +0800 Subject: [PATCH] Make retry on ratelimit configurable Signed-off-by: Congqi Xia --- client/client.go | 25 ------------------------ client/config.go | 28 ++++++++++++++++++++++++--- client/rate_limit_interceptor.go | 13 +++++-------- client/rate_limit_interceptor_test.go | 3 ++- client/results.go | 28 +++++++++++++++++++++++++++ 5 files changed, 60 insertions(+), 37 deletions(-) create mode 100644 client/results.go diff --git a/client/client.go b/client/client.go index 2cdfe301..9a74dced 100644 --- a/client/client.go +++ b/client/client.go @@ -204,31 +204,6 @@ type Client interface { GetVersion(ctx context.Context) (string, error) } -// SearchResult contains the result from Search api of client -// IDs is the auto generated id values for the entities -// Fields contains the data of `outputFieleds` specified or all columns if non -// Scores is actually the distance between the vector current record contains and the search target vector -type SearchResult struct { - ResultCount int // the returning entry count - IDs entity.Column // auto generated id, can be mapped to the columns from `Insert` API - Fields ResultSet // output field data - Scores []float32 // distance to the target vector - Err error // search error if any -} - -// ResultSet is an alias type for column slice. -type ResultSet []entity.Column - -// GetColumn returns column with provided field name. -func (rs ResultSet) GetColumn(fieldName string) entity.Column { - for _, column := range rs { - if column.Name() == fieldName { - return column - } - } - return nil -} - // Check if GrpcClient implement Client. var _ Client = &GrpcClient{} diff --git a/client/config.go b/client/config.go index 4386d672..241cc18a 100644 --- a/client/config.go +++ b/client/config.go @@ -63,11 +63,18 @@ type Config struct { parsedAddress *url.URL + RetryRateLimit *RetryRateLimitOption // option for retry on rate limit inteceptor + DisableConn bool flags uint64 // internal flags } +type RetryRateLimitOption struct { + MaxRetry uint + MaxBackoff time.Duration +} + // Copy a new config, dialOption may shared with old config. func (c *Config) Copy() Config { newConfig := Config{ @@ -147,9 +154,7 @@ func (c *Config) getDialOption() []grpc.DialOption { return 60 * time.Millisecond * time.Duration(math.Pow(3, float64(attempt))) }), grpc_retry.WithCodes(codes.Unavailable, codes.ResourceExhausted)), - RetryOnRateLimitInterceptor(75, func(ctx context.Context, attempt uint) time.Duration { - return 10 * time.Millisecond * time.Duration(math.Pow(3, float64(attempt))) - }), + c.getRetryOnRateLimitInterceptor(), )) options = append(options, grpc.WithChainUnaryInterceptor( @@ -158,6 +163,23 @@ func (c *Config) getDialOption() []grpc.DialOption { return options } +func (c *Config) getRetryOnRateLimitInterceptor() grpc.UnaryClientInterceptor { + if c.RetryRateLimit == nil { + c.RetryRateLimit = c.defaultRetryRateLimitOption() + } + + return RetryOnRateLimitInterceptor(c.RetryRateLimit.MaxRetry, c.RetryRateLimit.MaxBackoff, func(ctx context.Context, attempt uint) time.Duration { + return 10 * time.Millisecond * time.Duration(math.Pow(3, float64(attempt))) + }) +} + +func (c *Config) defaultRetryRateLimitOption() *RetryRateLimitOption { + return &RetryRateLimitOption{ + MaxRetry: 75, + MaxBackoff: 3 * time.Second, + } +} + // addFlags set internal flags func (c *Config) addFlags(flags uint64) { c.flags |= flags diff --git a/client/rate_limit_interceptor.go b/client/rate_limit_interceptor.go index c8d112e4..ff728be0 100644 --- a/client/rate_limit_interceptor.go +++ b/client/rate_limit_interceptor.go @@ -37,24 +37,21 @@ const ( RetryOnRateLimit ctxKey = iota ) -var MaxBackOff = 3 * time.Second - // RetryOnRateLimitInterceptor returns a new retrying unary client interceptor. -func RetryOnRateLimitInterceptor(maxRetry uint, backoffFunc grpc_retry.BackoffFuncContext) grpc.UnaryClientInterceptor { +func RetryOnRateLimitInterceptor(maxRetry uint, maxBackoff time.Duration, backoffFunc grpc_retry.BackoffFuncContext) grpc.UnaryClientInterceptor { return func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { if maxRetry == 0 { return invoker(parentCtx, method, req, reply, cc, opts...) } var lastErr error for attempt := uint(0); attempt < maxRetry; attempt++ { - _, err := waitRetryBackoff(parentCtx, attempt, backoffFunc) + _, err := waitRetryBackoff(parentCtx, attempt, maxBackoff, backoffFunc) if err != nil { return err } lastErr = invoker(parentCtx, method, req, reply, cc, opts...) rspStatus := getResultStatus(reply) if retryOnRateLimit(parentCtx) && rspStatus.GetErrorCode() == common.ErrorCode_RateLimit { - //log.Printf("rate limit retry attempt: %d, backoff for %v, reson: %v\n", attempt, backoff, rspStatus.GetReason()) continue } return lastErr @@ -102,14 +99,14 @@ func contextErrToGrpcErr(err error) error { } } -func waitRetryBackoff(parentCtx context.Context, attempt uint, backoffFunc grpc_retry.BackoffFuncContext) (time.Duration, error) { +func waitRetryBackoff(parentCtx context.Context, attempt uint, maxBackoff time.Duration, backoffFunc grpc_retry.BackoffFuncContext) (time.Duration, error) { var waitTime time.Duration if attempt > 0 { waitTime = backoffFunc(parentCtx, attempt) } if waitTime > 0 { - if waitTime > MaxBackOff { - waitTime = MaxBackOff + if waitTime > maxBackoff { + waitTime = maxBackoff } timer := time.NewTimer(waitTime) select { diff --git a/client/rate_limit_interceptor_test.go b/client/rate_limit_interceptor_test.go index af5a9494..784ee867 100644 --- a/client/rate_limit_interceptor_test.go +++ b/client/rate_limit_interceptor_test.go @@ -43,7 +43,8 @@ func resetMockInvokeTimes() { func TestRateLimitInterceptor(t *testing.T) { maxRetry := uint(3) - inter := RetryOnRateLimitInterceptor(maxRetry, func(ctx context.Context, attempt uint) time.Duration { + maxBackoff := 3 * time.Second + inter := RetryOnRateLimitInterceptor(maxRetry, maxBackoff, func(ctx context.Context, attempt uint) time.Duration { return 60 * time.Millisecond * time.Duration(math.Pow(2, float64(attempt))) }) diff --git a/client/results.go b/client/results.go new file mode 100644 index 00000000..2fde53cd --- /dev/null +++ b/client/results.go @@ -0,0 +1,28 @@ +package client + +import "github.com/milvus-io/milvus-sdk-go/v2/entity" + +// SearchResult contains the result from Search api of client +// IDs is the auto generated id values for the entities +// Fields contains the data of `outputFieleds` specified or all columns if non +// Scores is actually the distance between the vector current record contains and the search target vector +type SearchResult struct { + ResultCount int // the returning entry count + IDs entity.Column // auto generated id, can be mapped to the columns from `Insert` API + Fields ResultSet // output field data + Scores []float32 // distance to the target vector + Err error // search error if any +} + +// ResultSet is an alias type for column slice. +type ResultSet []entity.Column + +// GetColumn returns column with provided field name. +func (rs ResultSet) GetColumn(fieldName string) entity.Column { + for _, column := range rs { + if column.Name() == fieldName { + return column + } + } + return nil +}