Skip to content

Commit

Permalink
[querier] s3: add getObject retry (#4453)
Browse files Browse the repository at this point in the history
* [querier] s3: add getObject retry

* [querier] s3: add getObject retry

* [querier] s3: add getObject retry #4452

* [querier] s3: add getObject retry

* [querier] s3: add getObject retry #4453

* [querier] s3: add getObject retry

* [querier] s3: add getObject retry #4453

* [querier] s3: add getObject retry #4453

* [querier] s3: add getObject retry #4453

* [querier] s3: add getObject retry

Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
liguozhong and owen-d authored Oct 25, 2021
1 parent e0b4e25 commit 6fcd02b
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [4425](https://github.com/grafana/loki/pull/4425) **trevorwhitney** and **slim-bean**: Add a ring for the query scheduler
* [4519](https://github.com/grafana/loki/pull/4519) **DylanGuedes** and **replay**: Loki: Enable FIFO cache by default
* [4520](https://github.com/grafana/loki/pull/4520) **jordanrushing** and **owen-d**: Introduce overrides-exporter module for tenant limits
* [4453](https://github.com/grafana/loki/pull/4453) **liguozhong**: Loki: Implement retry to s3 chunk storage

# 2.3.0 (2021/08/06)

Expand Down
14 changes: 14 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1366,6 +1366,20 @@ aws:
# CLI flag: -s3.http.ca-file
[ca_file: <string> | default = ""]
# Configures back off when s3 get Object.
backoff_config:
# Minimum duration to back off.
# CLI flag: -s3.backoff-min-period
[min_period: <duration> | default = 100ms]
# The duration to back off.
# CLI flag: -s3.backoff-max-period
[max_period: <duration> | default = 3s]
# Number of times to back off and retry before failing.
# CLI flag: -s3.backoff-retries
[max_retries: <int> | default = 5]
# Configure the DynamoDB connection
dynamodb:
# URL for DynamoDB with escaped Key and Secret encoded. If only region is specified as a
Expand Down
36 changes: 25 additions & 11 deletions pkg/storage/chunk/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
cortex_aws "github.com/cortexproject/cortex/pkg/chunk/aws"
cortex_s3 "github.com/cortexproject/cortex/pkg/storage/bucket/s3"
"github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"

"github.com/grafana/loki/pkg/storage/chunk"
Expand Down Expand Up @@ -76,6 +77,7 @@ type S3Config struct {
HTTPConfig HTTPConfig `yaml:"http_config"`
SignatureVersion string `yaml:"signature_version"`
SSEConfig cortex_s3.SSEConfig `yaml:"sse"`
BackoffConfig backoff.Config `yaml:"backoff_config"`

Inject InjectRequestMiddleware `yaml:"-"`
}
Expand Down Expand Up @@ -116,6 +118,9 @@ func (cfg *S3Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.HTTPConfig.InsecureSkipVerify, prefix+"s3.http.insecure-skip-verify", false, "Set to true to skip verifying the certificate chain and hostname.")
f.StringVar(&cfg.HTTPConfig.CAFile, prefix+"s3.http.ca-file", "", "Path to the trusted CA file that signed the SSL certificate of the S3 endpoint.")
f.StringVar(&cfg.SignatureVersion, prefix+"s3.signature-version", SignatureVersionV4, fmt.Sprintf("The signature version to use for authenticating against S3. Supported values are: %s.", strings.Join(supportedSignatureVersions, ", ")))
f.DurationVar(&cfg.BackoffConfig.MinBackoff, "s3.min-backoff", 100*time.Millisecond, "Minimum backoff time when s3 get Object")
f.DurationVar(&cfg.BackoffConfig.MaxBackoff, "s3.max-backoff", 3*time.Second, "Maximum backoff time when s3 get Object")
f.IntVar(&cfg.BackoffConfig.MaxRetries, "s3.max-retries", 5, "Maximum number of times to retry when s3 get Object")
}

// Validate config and returns error on failure
Expand Down Expand Up @@ -153,6 +158,7 @@ func (cfg *HTTPConfig) ToCortexHTTPConfig() cortex_aws.HTTPConfig {
}

type S3ObjectClient struct {
cfg S3Config
bucketNames []string
S3 s3iface.S3API
sseConfig *SSEParsedConfig
Expand Down Expand Up @@ -182,6 +188,7 @@ func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) {
}

client := S3ObjectClient{
cfg: cfg,
S3: s3Client,
bucketNames: bucketNames,
sseConfig: sseCfg,
Expand Down Expand Up @@ -360,19 +367,26 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re
// Map the key into a bucket
bucket := a.bucketFromKey(objectKey)

err := instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var err error
resp, err = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(objectKey),
retries := backoff.New(ctx, a.cfg.BackoffConfig)
err := ctx.Err()
for retries.Ongoing() {
if ctx.Err() != nil {
return nil, errors.Wrap(ctx.Err(), "ctx related error during s3 getObject")
}
err = instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var requestErr error
resp, requestErr = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(objectKey),
})
return requestErr
})
return err
})
if err != nil {
return nil, err
if err == nil {
return resp.Body, nil
}
retries.Wait()
}

return resp.Body, nil
return nil, errors.Wrap(err, "failed to get s3 object")
}

// PutObject into the store
Expand Down

0 comments on commit 6fcd02b

Please sign in to comment.