Skip to content

Commit

Permalink
support for registering custom index clients, added new methods to ob…
Browse files Browse the repository at this point in the history
…ject stores (grafana#2049)

* support for registering custom index clients, added new methods to object store

NewIndexClient accepts factory methods for creating custom index clients
added new methods to object stores to work on objects(io.Reader) instead of just chunks

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* splitted s3 config from dynamodb config and updated docs

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* removed unwanted code

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* added List method to azure and addressed other feedback in PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* addressed some of the feedback from PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* changes suggested from PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* fixed an issue with reporting errors in PutObject for GCS object store

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
  • Loading branch information
sandeepsukhani authored Feb 7, 2020
1 parent 031709a commit 7279221
Show file tree
Hide file tree
Showing 14 changed files with 685 additions and 220 deletions.
10 changes: 2 additions & 8 deletions aws/dynamodb_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,13 @@ func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) {
// StorageConfig specifies config for storing data on AWS.
type StorageConfig struct {
DynamoDBConfig
S3 flagext.URLValue
BucketNames string
S3ForcePathStyle bool
S3Config `yaml:",inline"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *StorageConfig) RegisterFlags(f *flag.FlagSet) {
cfg.DynamoDBConfig.RegisterFlags(f)

f.Var(&cfg.S3, "s3.url", "S3 endpoint URL with escaped Key and Secret encoded. "+
"If only region is specified as a host, proper endpoint will be deduced. Use inmemory:///<bucket-name> to use a mock in-memory implementation.")
f.BoolVar(&cfg.S3ForcePathStyle, "s3.force-path-style", false, "Set this to `true` to force the request to use path-style addressing.")
f.StringVar(&cfg.BucketNames, "s3.buckets", "", "Comma separated list of bucket names to evenly distribute chunks over. Overrides any buckets specified in s3.url flag")
cfg.S3Config.RegisterFlags(f)
}

type dynamoDBStorageClient struct {
Expand Down
2 changes: 1 addition & 1 deletion aws/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var Fixtures = []testutils.Fixture{
batchWriteItemRequestFn: dynamoDB.batchWriteItemRequest,
schemaCfg: schemaConfig,
}
object := &s3ObjectClient{
object := &S3ObjectClient{
S3: newMockS3(),
}
return index, object, table, schemaConfig, nil
Expand Down
154 changes: 117 additions & 37 deletions aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package aws
import (
"bytes"
"context"
"flag"
"fmt"
"hash/fnv"
"io"
"io/ioutil"
"strings"

Expand All @@ -16,6 +18,7 @@ import (

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
awscommon "github.com/weaveworks/common/aws"
"github.com/weaveworks/common/instrument"
)
Expand All @@ -33,13 +36,33 @@ func init() {
s3RequestDuration.Register()
}

type s3ObjectClient struct {
// S3Config specifies config for storing chunks on AWS S3.
type S3Config struct {
S3 flagext.URLValue
BucketNames string
S3ForcePathStyle bool
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *S3Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet with a specified prefix
func (cfg *S3Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.Var(&cfg.S3, prefix+"s3.url", "S3 endpoint URL with escaped Key and Secret encoded. "+
"If only region is specified as a host, proper endpoint will be deduced. Use inmemory:///<bucket-name> to use a mock in-memory implementation.")
f.BoolVar(&cfg.S3ForcePathStyle, prefix+"s3.force-path-style", false, "Set this to `true` to force the request to use path-style addressing.")
f.StringVar(&cfg.BucketNames, prefix+"s3.buckets", "", "Comma separated list of bucket names to evenly distribute chunks over. Overrides any buckets specified in s3.url flag")
}

type S3ObjectClient struct {
bucketNames []string
S3 s3iface.S3API
}

// NewS3ObjectClient makes a new S3-backed ObjectClient.
func NewS3ObjectClient(cfg StorageConfig, schemaCfg chunk.SchemaConfig) (chunk.ObjectClient, error) {
func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) {
if cfg.S3.URL == nil {
return nil, fmt.Errorf("no URL specified for S3")
}
Expand All @@ -60,50 +83,40 @@ func NewS3ObjectClient(cfg StorageConfig, schemaCfg chunk.SchemaConfig) (chunk.O
if cfg.BucketNames != "" {
bucketNames = strings.Split(cfg.BucketNames, ",") // comma separated list of bucket names
}
client := s3ObjectClient{
client := S3ObjectClient{
S3: s3Client,
bucketNames: bucketNames,
}
return client, nil
return &client, nil
}

func (a s3ObjectClient) Stop() {
func (a *S3ObjectClient) Stop() {
}

func (a s3ObjectClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
func (a *S3ObjectClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
return util.GetParallelChunks(ctx, chunks, a.getChunk)
}

func (a s3ObjectClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) {
var resp *s3.GetObjectOutput

// Map the key into a bucket
key := c.ExternalKey()
bucket := a.bucketFromKey(key)

err := instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var err error
resp, err = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
return err
})
func (a *S3ObjectClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) {
readCloser, err := a.GetObject(ctx, c.ExternalKey())
if err != nil {
return chunk.Chunk{}, err
}
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)

defer readCloser.Close()

buf, err := ioutil.ReadAll(readCloser)
if err != nil {
return chunk.Chunk{}, err
}

if err := c.Decode(decodeContext, buf); err != nil {
return chunk.Chunk{}, err
}
return c, nil
}

func (a s3ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
func (a *S3ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
var (
s3ChunkKeys []string
s3ChunkBufs [][]byte
Expand All @@ -123,7 +136,7 @@ func (a s3ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err
incomingErrors := make(chan error)
for i := range s3ChunkBufs {
go func(i int) {
incomingErrors <- a.putS3Chunk(ctx, s3ChunkKeys[i], s3ChunkBufs[i])
incomingErrors <- a.PutObject(ctx, s3ChunkKeys[i], bytes.NewReader(s3ChunkBufs[i]))
}(i)
}

Expand All @@ -137,19 +150,8 @@ func (a s3ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err
return lastErr
}

func (a s3ObjectClient) putS3Chunk(ctx context.Context, key string, buf []byte) error {
return instrument.CollectedRequest(ctx, "S3.PutObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
_, err := a.S3.PutObjectWithContext(ctx, &s3.PutObjectInput{
Body: bytes.NewReader(buf),
Bucket: aws.String(a.bucketFromKey(key)),
Key: aws.String(key),
})
return err
})
}

// bucketFromKey maps a key to a bucket name
func (a s3ObjectClient) bucketFromKey(key string) string {
func (a *S3ObjectClient) bucketFromKey(key string) string {
if len(a.bucketNames) == 0 {
return ""
}
Expand All @@ -160,3 +162,81 @@ func (a s3ObjectClient) bucketFromKey(key string) string {

return a.bucketNames[hash%uint32(len(a.bucketNames))]
}

// Get object from the store
func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) {
var resp *s3.GetObjectOutput

// Map the key into a bucket
bucket := a.bucketFromKey(objectKey)

err := instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var err error
resp, err = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(objectKey),
})
return err
})
if err != nil {
return nil, err
}

return resp.Body, nil
}

// Put object into the store
func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
return instrument.CollectedRequest(ctx, "S3.PutObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
_, err := a.S3.PutObjectWithContext(ctx, &s3.PutObjectInput{
Body: object,
Bucket: aws.String(a.bucketFromKey(objectKey)),
Key: aws.String(objectKey),
})
return err
})
}

// List only objects from the store non-recursively
func (a *S3ObjectClient) List(ctx context.Context, prefix string) ([]chunk.StorageObject, error) {
var storageObjects []chunk.StorageObject

for i := range a.bucketNames {
err := instrument.CollectedRequest(ctx, "S3.List", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
input := s3.ListObjectsV2Input{
Bucket: aws.String(a.bucketNames[i]),
Prefix: aws.String(prefix),
Delimiter: aws.String(chunk.DirDelim),
}

for {
output, err := a.S3.ListObjectsV2WithContext(ctx, &input)
if err != nil {
return err
}

for _, content := range output.Contents {
storageObjects = append(storageObjects, chunk.StorageObject{
Key: *content.Key,
ModifiedAt: *content.LastModified,
})
}

if !*output.IsTruncated {
// No more results to fetch
break
}

input.SetContinuationToken(*output.NextContinuationToken)
}

return nil
})

if err != nil {
return nil, err
}
}

return storageObjects, nil
}
Loading

0 comments on commit 7279221

Please sign in to comment.