diff --git a/CHANGELOG.md b/CHANGELOG.md index db2ae8c9b0dcb..787569df46509 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * [4425](https://github.com/grafana/loki/pull/4425) **trevorwhitney** and **slim-bean**: Add a ring for the query scheduler * [4519](https://github.com/grafana/loki/pull/4519) **DylanGuedes** and **replay**: Loki: Enable FIFO cache by default * [4520](https://github.com/grafana/loki/pull/4520) **jordanrushing** and **owen-d**: Introduce overrides-exporter module for tenant limits +* [4453](https://github.com/grafana/loki/pull/4453) **liguozhong**: Loki: Implement retry to s3 chunk storage # 2.3.0 (2021/08/06) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 2b00c4f47a093..b6f52612b389f 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -1366,6 +1366,20 @@ aws: # CLI flag: -s3.http.ca-file [ca_file: | default = ""] + # Configures back off when s3 get Object. + backoff_config: + # Minimum duration to back off. + # CLI flag: -s3.backoff-min-period + [min_period: | default = 100ms] + + # The duration to back off. + # CLI flag: -s3.backoff-max-period + [max_period: | default = 3s] + + # Number of times to back off and retry before failing. + # CLI flag: -s3.backoff-retries + [max_retries: | default = 5] + # Configure the DynamoDB connection dynamodb: # URL for DynamoDB with escaped Key and Secret encoded. If only region is specified as a diff --git a/pkg/storage/chunk/aws/s3_storage_client.go b/pkg/storage/chunk/aws/s3_storage_client.go index 38703825dea1c..5f153c8e6fba3 100644 --- a/pkg/storage/chunk/aws/s3_storage_client.go +++ b/pkg/storage/chunk/aws/s3_storage_client.go @@ -31,6 +31,7 @@ import ( cortex_aws "github.com/cortexproject/cortex/pkg/chunk/aws" cortex_s3 "github.com/cortexproject/cortex/pkg/storage/bucket/s3" "github.com/cortexproject/cortex/pkg/util" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/flagext" "github.com/grafana/loki/pkg/storage/chunk" @@ -76,6 +77,7 @@ type S3Config struct { HTTPConfig HTTPConfig `yaml:"http_config"` SignatureVersion string `yaml:"signature_version"` SSEConfig cortex_s3.SSEConfig `yaml:"sse"` + BackoffConfig backoff.Config `yaml:"backoff_config"` Inject InjectRequestMiddleware `yaml:"-"` } @@ -116,6 +118,9 @@ func (cfg *S3Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.BoolVar(&cfg.HTTPConfig.InsecureSkipVerify, prefix+"s3.http.insecure-skip-verify", false, "Set to true to skip verifying the certificate chain and hostname.") f.StringVar(&cfg.HTTPConfig.CAFile, prefix+"s3.http.ca-file", "", "Path to the trusted CA file that signed the SSL certificate of the S3 endpoint.") f.StringVar(&cfg.SignatureVersion, prefix+"s3.signature-version", SignatureVersionV4, fmt.Sprintf("The signature version to use for authenticating against S3. Supported values are: %s.", strings.Join(supportedSignatureVersions, ", "))) + f.DurationVar(&cfg.BackoffConfig.MinBackoff, "s3.min-backoff", 100*time.Millisecond, "Minimum backoff time when s3 get Object") + f.DurationVar(&cfg.BackoffConfig.MaxBackoff, "s3.max-backoff", 3*time.Second, "Maximum backoff time when s3 get Object") + f.IntVar(&cfg.BackoffConfig.MaxRetries, "s3.max-retries", 5, "Maximum number of times to retry when s3 get Object") } // Validate config and returns error on failure @@ -153,6 +158,7 @@ func (cfg *HTTPConfig) ToCortexHTTPConfig() cortex_aws.HTTPConfig { } type S3ObjectClient struct { + cfg S3Config bucketNames []string S3 s3iface.S3API sseConfig *SSEParsedConfig @@ -182,6 +188,7 @@ func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) { } client := S3ObjectClient{ + cfg: cfg, S3: s3Client, bucketNames: bucketNames, sseConfig: sseCfg, @@ -360,19 +367,26 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re // Map the key into a bucket bucket := a.bucketFromKey(objectKey) - err := instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { - var err error - resp, err = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(objectKey), + retries := backoff.New(ctx, a.cfg.BackoffConfig) + err := ctx.Err() + for retries.Ongoing() { + if ctx.Err() != nil { + return nil, errors.Wrap(ctx.Err(), "ctx related error during s3 getObject") + } + err = instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + var requestErr error + resp, requestErr = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(objectKey), + }) + return requestErr }) - return err - }) - if err != nil { - return nil, err + if err == nil { + return resp.Body, nil + } + retries.Wait() } - - return resp.Body, nil + return nil, errors.Wrap(err, "failed to get s3 object") } // PutObject into the store