Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make retry on ratelimit configurable #522

Merged
merged 1 commit into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 0 additions & 25 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,31 +204,6 @@ type Client interface {
GetVersion(ctx context.Context) (string, error)
}

// SearchResult contains the result from Search api of client
// IDs is the auto generated id values for the entities
// Fields contains the data of `outputFieleds` specified or all columns if non
// Scores is actually the distance between the vector current record contains and the search target vector
type SearchResult struct {
ResultCount int // the returning entry count
IDs entity.Column // auto generated id, can be mapped to the columns from `Insert` API
Fields ResultSet // output field data
Scores []float32 // distance to the target vector
Err error // search error if any
}

// ResultSet is an alias type for column slice.
type ResultSet []entity.Column

// GetColumn returns column with provided field name.
func (rs ResultSet) GetColumn(fieldName string) entity.Column {
for _, column := range rs {
if column.Name() == fieldName {
return column
}
}
return nil
}

// Check if GrpcClient implement Client.
var _ Client = &GrpcClient{}

Expand Down
28 changes: 25 additions & 3 deletions client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,18 @@ type Config struct {

parsedAddress *url.URL

RetryRateLimit *RetryRateLimitOption // option for retry on rate limit inteceptor

DisableConn bool

flags uint64 // internal flags
}

type RetryRateLimitOption struct {
MaxRetry uint
MaxBackoff time.Duration
}

// Copy a new config, dialOption may shared with old config.
func (c *Config) Copy() Config {
newConfig := Config{
Expand Down Expand Up @@ -147,9 +154,7 @@ func (c *Config) getDialOption() []grpc.DialOption {
return 60 * time.Millisecond * time.Duration(math.Pow(3, float64(attempt)))
}),
grpc_retry.WithCodes(codes.Unavailable, codes.ResourceExhausted)),
RetryOnRateLimitInterceptor(75, func(ctx context.Context, attempt uint) time.Duration {
return 10 * time.Millisecond * time.Duration(math.Pow(3, float64(attempt)))
}),
c.getRetryOnRateLimitInterceptor(),
))

options = append(options, grpc.WithChainUnaryInterceptor(
Expand All @@ -158,6 +163,23 @@ func (c *Config) getDialOption() []grpc.DialOption {
return options
}

func (c *Config) getRetryOnRateLimitInterceptor() grpc.UnaryClientInterceptor {
if c.RetryRateLimit == nil {
c.RetryRateLimit = c.defaultRetryRateLimitOption()
}

return RetryOnRateLimitInterceptor(c.RetryRateLimit.MaxRetry, c.RetryRateLimit.MaxBackoff, func(ctx context.Context, attempt uint) time.Duration {
return 10 * time.Millisecond * time.Duration(math.Pow(3, float64(attempt)))
})
}

func (c *Config) defaultRetryRateLimitOption() *RetryRateLimitOption {
return &RetryRateLimitOption{
MaxRetry: 75,
MaxBackoff: 3 * time.Second,
}
}

// addFlags set internal flags
func (c *Config) addFlags(flags uint64) {
c.flags |= flags
Expand Down
13 changes: 5 additions & 8 deletions client/rate_limit_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,21 @@ const (
RetryOnRateLimit ctxKey = iota
)

var MaxBackOff = 3 * time.Second

// RetryOnRateLimitInterceptor returns a new retrying unary client interceptor.
func RetryOnRateLimitInterceptor(maxRetry uint, backoffFunc grpc_retry.BackoffFuncContext) grpc.UnaryClientInterceptor {
func RetryOnRateLimitInterceptor(maxRetry uint, maxBackoff time.Duration, backoffFunc grpc_retry.BackoffFuncContext) grpc.UnaryClientInterceptor {
return func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if maxRetry == 0 {
return invoker(parentCtx, method, req, reply, cc, opts...)
}
var lastErr error
for attempt := uint(0); attempt < maxRetry; attempt++ {
_, err := waitRetryBackoff(parentCtx, attempt, backoffFunc)
_, err := waitRetryBackoff(parentCtx, attempt, maxBackoff, backoffFunc)
if err != nil {
return err
}
lastErr = invoker(parentCtx, method, req, reply, cc, opts...)
rspStatus := getResultStatus(reply)
if retryOnRateLimit(parentCtx) && rspStatus.GetErrorCode() == common.ErrorCode_RateLimit {
//log.Printf("rate limit retry attempt: %d, backoff for %v, reson: %v\n", attempt, backoff, rspStatus.GetReason())
continue
}
return lastErr
Expand Down Expand Up @@ -102,14 +99,14 @@ func contextErrToGrpcErr(err error) error {
}
}

func waitRetryBackoff(parentCtx context.Context, attempt uint, backoffFunc grpc_retry.BackoffFuncContext) (time.Duration, error) {
func waitRetryBackoff(parentCtx context.Context, attempt uint, maxBackoff time.Duration, backoffFunc grpc_retry.BackoffFuncContext) (time.Duration, error) {
var waitTime time.Duration
if attempt > 0 {
waitTime = backoffFunc(parentCtx, attempt)
}
if waitTime > 0 {
if waitTime > MaxBackOff {
waitTime = MaxBackOff
if waitTime > maxBackoff {
waitTime = maxBackoff
}
timer := time.NewTimer(waitTime)
select {
Expand Down
3 changes: 2 additions & 1 deletion client/rate_limit_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func resetMockInvokeTimes() {

func TestRateLimitInterceptor(t *testing.T) {
maxRetry := uint(3)
inter := RetryOnRateLimitInterceptor(maxRetry, func(ctx context.Context, attempt uint) time.Duration {
maxBackoff := 3 * time.Second
inter := RetryOnRateLimitInterceptor(maxRetry, maxBackoff, func(ctx context.Context, attempt uint) time.Duration {
return 60 * time.Millisecond * time.Duration(math.Pow(2, float64(attempt)))
})

Expand Down
28 changes: 28 additions & 0 deletions client/results.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package client

import "github.com/milvus-io/milvus-sdk-go/v2/entity"

// SearchResult contains the result from Search api of client
// IDs is the auto generated id values for the entities
// Fields contains the data of `outputFieleds` specified or all columns if non
// Scores is actually the distance between the vector current record contains and the search target vector
type SearchResult struct {
ResultCount int // the returning entry count
IDs entity.Column // auto generated id, can be mapped to the columns from `Insert` API
Fields ResultSet // output field data
Scores []float32 // distance to the target vector
Err error // search error if any
}

// ResultSet is an alias type for column slice.
type ResultSet []entity.Column

// GetColumn returns column with provided field name.
func (rs ResultSet) GetColumn(fieldName string) entity.Column {
for _, column := range rs {
if column.Name() == fieldName {
return column
}
}
return nil
}