Skip to content

Commit

Permalink
S3 Config Improvements (grafana#2831)
Browse files Browse the repository at this point in the history
* established pattern

Signed-off-by: Joe Elliott <number101010@gmail.com>

* First pass http config

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added insecure.  Include bucketn names

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added tests

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Tests pass!

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added url test

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added mixed config test

Signed-off-by: Joe Elliott <number101010@gmail.com>

* added sse support

Signed-off-by: Joe Elliott <number101010@gmail.com>

* PutUserMetadata

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Removed comments

Signed-off-by: Joe Elliott <number101010@gmail.com>

* lint

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Better error contextualization

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Improved error msgs

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added yaml tags and descriptions

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Moved storage client test to where it will work

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Updated docs and changelog

Signed-off-by: Joe Elliott <number101010@gmail.com>

* removed putUserMetaData

Signed-off-by: Joe Elliott <number101010@gmail.com>

* removed capitlization of error msgs

Signed-off-by: Joe Elliott <number101010@gmail.com>

* errors cleanup

Signed-off-by: Joe Elliott <number101010@gmail.com>

* removed obsolete comment

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Cleaned up comments

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Addressed feedback

Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott authored Jul 8, 2020
1 parent 29107ba commit e4f9647
Showing 1 changed file with 133 additions and 24 deletions.
157 changes: 133 additions & 24 deletions aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@ package aws

import (
"context"
"crypto/tls"
"flag"
"fmt"
"hash/fnv"
"io"
"net"
"net/http"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
awscommon "github.com/weaveworks/common/aws"
"github.com/weaveworks/common/instrument"
Expand All @@ -38,8 +42,23 @@ func init() {
// S3Config specifies config for storing chunks on AWS S3.
type S3Config struct {
S3 flagext.URLValue
BucketNames string
S3ForcePathStyle bool

BucketNames string
Endpoint string `yaml:"endpoint"`
Region string `yaml:"region"`
AccessKeyID string `yaml:"access_key_id"`
SecretAccessKey string `yaml:"secret_access_key"`
Insecure bool `yaml:"insecure"`
SSEEncryption bool `yaml:"sse_encryption"`
HTTPConfig HTTPConfig `yaml:"http_config"`
}

// HTTPConfig stores the http.Transport configuration
type HTTPConfig struct {
IdleConnTimeout time.Duration `yaml:"idle_conn_timeout"`
ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout"`
InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -53,45 +72,134 @@ func (cfg *S3Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
"If only region is specified as a host, proper endpoint will be deduced. Use inmemory:///<bucket-name> to use a mock in-memory implementation.")
f.BoolVar(&cfg.S3ForcePathStyle, prefix+"s3.force-path-style", false, "Set this to `true` to force the request to use path-style addressing.")
f.StringVar(&cfg.BucketNames, prefix+"s3.buckets", "", "Comma separated list of bucket names to evenly distribute chunks over. Overrides any buckets specified in s3.url flag")

f.StringVar(&cfg.Endpoint, prefix+"s3.endpoint", "", "S3 Endpoint to connect to.")
f.StringVar(&cfg.Region, prefix+"s3.region", "", "AWS region to use.")
f.StringVar(&cfg.AccessKeyID, prefix+"s3.access-key-id", "", "AWS Access Key ID")
f.StringVar(&cfg.SecretAccessKey, prefix+"s3.secret-access-key", "", "AWS Secret Access Key")
f.BoolVar(&cfg.Insecure, prefix+"s3.insecure", false, "Disable https on s3 connection.")
f.BoolVar(&cfg.SSEEncryption, prefix+"s3.sse-encryption", false, "Enable AES256 AWS Server Side Encryption")

f.DurationVar(&cfg.HTTPConfig.IdleConnTimeout, prefix+"s3.http.idle-conn-timeout", 90*time.Second, "The maximum amount of time an idle connection will be held open.")
f.DurationVar(&cfg.HTTPConfig.ResponseHeaderTimeout, prefix+"s3.http.response-header-timeout", 0, "If non-zero, specifies the amount of time to wait for a server's response headers after fully writing the request.")
f.BoolVar(&cfg.HTTPConfig.InsecureSkipVerify, prefix+"s3.http.insecure-skip-verify", false, "Set to false to skip verifying the certificate chain and hostname.")
}

type S3ObjectClient struct {
bucketNames []string
S3 s3iface.S3API
delimiter string
bucketNames []string
S3 s3iface.S3API
delimiter string
sseEncryption *string
}

// NewS3ObjectClient makes a new S3-backed ObjectClient.
func NewS3ObjectClient(cfg S3Config, delimiter string) (*S3ObjectClient, error) {
if cfg.S3.URL == nil {
return nil, fmt.Errorf("no URL specified for S3")
}
s3Config, err := awscommon.ConfigFromURL(cfg.S3.URL)
s3Config, bucketNames, err := buildS3Config(cfg)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to build s3 config")
}

s3Config = s3Config.WithS3ForcePathStyle(cfg.S3ForcePathStyle) // support for Path Style S3 url if has the flag

s3Config = s3Config.WithMaxRetries(0) // We do our own retries, so we can monitor them
s3Config = s3Config.WithHTTPClient(&http.Client{Transport: defaultTransport})
sess, err := session.NewSession(s3Config)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to create new s3 session")
}

s3Client := s3.New(sess)
bucketNames := []string{strings.TrimPrefix(cfg.S3.URL.Path, "/")}
if cfg.BucketNames != "" {
bucketNames = strings.Split(cfg.BucketNames, ",") // comma separated list of bucket names

var sseEncryption *string
if cfg.SSEEncryption {
sseEncryption = aws.String("AES256")
}

client := S3ObjectClient{
S3: s3Client,
bucketNames: bucketNames,
delimiter: delimiter,
S3: s3Client,
bucketNames: bucketNames,
delimiter: delimiter,
sseEncryption: sseEncryption,
}
return &client, nil
}

func buildS3Config(cfg S3Config) (*aws.Config, []string, error) {
var s3Config *aws.Config
var err error

// if an s3 url is passed use it to initialize the s3Config and then override with any additional params
if cfg.S3.URL != nil {
s3Config, err = awscommon.ConfigFromURL(cfg.S3.URL)
if err != nil {
return nil, nil, err
}
} else {
s3Config = &aws.Config{}
s3Config = s3Config.WithRegion("dummy")
s3Config = s3Config.WithCredentials(credentials.AnonymousCredentials)
}

s3Config = s3Config.WithMaxRetries(0) // We do our own retries, so we can monitor them
s3Config = s3Config.WithS3ForcePathStyle(cfg.S3ForcePathStyle) // support for Path Style S3 url if has the flag

if cfg.Endpoint != "" {
s3Config = s3Config.WithEndpoint(cfg.Endpoint)
}

if cfg.Insecure {
s3Config = s3Config.WithDisableSSL(true)
}

if cfg.Region != "" {
s3Config = s3Config.WithRegion(cfg.Region)
}

if cfg.AccessKeyID != "" && cfg.SecretAccessKey == "" ||
cfg.AccessKeyID == "" && cfg.SecretAccessKey != "" {
return nil, nil, errors.New("must supply both an Access Key ID and Secret Access Key or neither")
}

if cfg.AccessKeyID != "" && cfg.SecretAccessKey != "" {
creds := credentials.NewStaticCredentials(cfg.AccessKeyID, cfg.SecretAccessKey, "")
s3Config = s3Config.WithCredentials(creds)
}

// While extending S3 configuration this http config was copied in order to
// to maintain backwards compatibility with previous versions of Cortex while providing
// more flexible configuration of the http client
// https://github.com/weaveworks/common/blob/4b1847531bc94f54ce5cf210a771b2a86cd34118/aws/config.go#L23
s3Config = s3Config.WithHTTPClient(&http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: cfg.HTTPConfig.IdleConnTimeout,
MaxIdleConnsPerHost: 100,
TLSHandshakeTimeout: 3 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
ResponseHeaderTimeout: time.Duration(cfg.HTTPConfig.ResponseHeaderTimeout),
TLSClientConfig: &tls.Config{InsecureSkipVerify: cfg.HTTPConfig.InsecureSkipVerify},
},
})

// bucketnames
var bucketNames []string
if cfg.S3.URL != nil {
bucketNames = []string{strings.TrimPrefix(cfg.S3.URL.Path, "/")}
}

if cfg.BucketNames != "" {
bucketNames = strings.Split(cfg.BucketNames, ",") // comma separated list of bucket names
}

if len(bucketNames) == 0 {
return nil, nil, errors.New("at least one bucket name must be specified")
}

return s3Config, bucketNames, nil
}

// Stop fulfills the chunk.ObjectClient interface
func (a *S3ObjectClient) Stop() {}

Expand Down Expand Up @@ -160,9 +268,10 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re
func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
return instrument.CollectedRequest(ctx, "S3.PutObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
_, err := a.S3.PutObjectWithContext(ctx, &s3.PutObjectInput{
Body: object,
Bucket: aws.String(a.bucketFromKey(objectKey)),
Key: aws.String(objectKey),
Body: object,
Bucket: aws.String(a.bucketFromKey(objectKey)),
Key: aws.String(objectKey),
ServerSideEncryption: a.sseEncryption,
})
return err
})
Expand Down

0 comments on commit e4f9647

Please sign in to comment.