-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(storage): Azure backend using thanos.io/objstore #11315
Merged
Merged
Changes from 17 commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
43bba36
Groundwork for azure implementation
JoaoBraveCoding b93fbcd
Fixed configuration for bucket/azure
JoaoBraveCoding a259abd
Added support for BlobStorage using thanos/objstore
JoaoBraveCoding 42f37ba
Azure CLI review
JoaoBraveCoding 40aa2f9
Fixes from testing
JoaoBraveCoding e394042
Merge branch 'main' into log-4550-azure
JoaoBraveCoding 02218a4
Merge branch 'main' into log-4550-azure
JoaoBraveCoding 6db3b13
Merge branch 'main' into log-4550-azure
JoaoBraveCoding 4399687
Merge branch 'main' into log-4550-azure
JoaoBraveCoding 1386834
Merge branch 'main' into log-4550-azure
ashwanthgoli 679c2a8
clean-up deleted files from main
ashwanthgoli 06f2280
config parity
ashwanthgoli 8d92fbd
add missing methods to azure thanos adapter
ashwanthgoli 76c41fe
use objectclient adapter
ashwanthgoli 0557d69
lint
ashwanthgoli a57c140
make format
ashwanthgoli a7da0f4
remove gcs comment
ashwanthgoli e7d4b60
make retryableFunc an option
ashwanthgoli c8d337c
review suggestions
ashwanthgoli a11f037
Merge branch 'main' into log-4550-azure
ashwanthgoli File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,39 +1,37 @@ | ||
package azure | ||
|
||
import ( | ||
"net/http" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/prometheus/common/model" | ||
"github.com/thanos-io/objstore" | ||
"github.com/thanos-io/objstore/providers/azure" | ||
yaml "gopkg.in/yaml.v2" | ||
) | ||
|
||
func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) { | ||
bucketConfig := azure.Config{ | ||
StorageAccountName: cfg.StorageAccountName, | ||
StorageAccountKey: cfg.StorageAccountKey.String(), | ||
StorageConnectionString: cfg.ConnectionString.String(), | ||
ContainerName: cfg.ContainerName, | ||
Endpoint: cfg.EndpointSuffix, | ||
MaxRetries: cfg.MaxRetries, | ||
HTTPConfig: azure.HTTPConfig{ | ||
IdleConnTimeout: model.Duration(cfg.IdleConnTimeout), | ||
ResponseHeaderTimeout: model.Duration(cfg.ResponseHeaderTimeout), | ||
InsecureSkipVerify: cfg.InsecureSkipVerify, | ||
TLSHandshakeTimeout: model.Duration(cfg.TLSHandshakeTimeout), | ||
ExpectContinueTimeout: model.Duration(cfg.ExpectContinueTimeout), | ||
MaxIdleConns: cfg.MaxIdleConns, | ||
MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost, | ||
MaxConnsPerHost: cfg.MaxConnsPerHost, | ||
}, | ||
return newBucketClient(cfg, name, logger, azure.NewBucketWithConfig) | ||
} | ||
|
||
func newBucketClient(cfg Config, name string, logger log.Logger, factory func(log.Logger, azure.Config, string, http.RoundTripper) (*azure.Bucket, error)) (objstore.Bucket, error) { | ||
// Start with default config to make sure that all parameters are set to sensible values, especially | ||
// HTTP Config field. | ||
bucketConfig := azure.DefaultConfig | ||
bucketConfig.StorageAccountName = cfg.StorageAccountName | ||
bucketConfig.StorageAccountKey = cfg.StorageAccountKey.String() | ||
bucketConfig.StorageConnectionString = cfg.StorageConnectionString.String() | ||
bucketConfig.ContainerName = cfg.ContainerName | ||
bucketConfig.MaxRetries = cfg.MaxRetries | ||
bucketConfig.UserAssignedID = cfg.UserAssignedID | ||
|
||
if cfg.Endpoint != "" { | ||
// azure.DefaultConfig has the default Endpoint, overwrite it only if a different one was explicitly provided. | ||
bucketConfig.Endpoint = cfg.Endpoint | ||
} | ||
|
||
// Thanos currently doesn't support passing the config as is, but expects a YAML, | ||
// so we're going to serialize it. | ||
serialized, err := yaml.Marshal(bucketConfig) | ||
if err != nil { | ||
return nil, err | ||
var rt http.RoundTripper | ||
if cfg.Transport != nil { | ||
rt = cfg.Transport | ||
} | ||
|
||
return azure.NewBucket(logger, serialized, name, nil) | ||
return factory(logger, bucketConfig, name, rt) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
package bucket | ||
|
||
import ( | ||
"context" | ||
"io" | ||
"strings" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/go-kit/log/level" | ||
"github.com/pkg/errors" | ||
|
||
"github.com/thanos-io/objstore" | ||
|
||
"github.com/grafana/loki/v3/pkg/storage/chunk/client" | ||
) | ||
|
||
type ObjectClientAdapter struct { | ||
bucket, hedgedBucket objstore.Bucket | ||
logger log.Logger | ||
isRetryableErr func(err error) bool | ||
} | ||
|
||
func NewObjectClientAdapter(bucket, hedgedBucket objstore.Bucket, logger log.Logger) *ObjectClientAdapter { | ||
if hedgedBucket == nil { | ||
hedgedBucket = bucket | ||
} | ||
|
||
return &ObjectClientAdapter{ | ||
bucket: bucket, | ||
hedgedBucket: hedgedBucket, | ||
logger: log.With(logger, "component", "bucket_to_object_client_adapter"), | ||
// default to no retryable errors. Override with WithRetryableErrFunc | ||
isRetryableErr: func(_ error) bool { | ||
return false | ||
}, | ||
} | ||
} | ||
|
||
func WithRetryableErrFunc(f func(err error) bool) func(*ObjectClientAdapter) { | ||
return func(o *ObjectClientAdapter) { | ||
o.isRetryableErr = f | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Personally I find this is quite an unusual API. |
||
|
||
func (o *ObjectClientAdapter) Stop() { | ||
} | ||
|
||
// ObjectExists checks if a given objectKey exists in the bucket | ||
func (o *ObjectClientAdapter) ObjectExists(ctx context.Context, objectKey string) (bool, error) { | ||
return o.bucket.Exists(ctx, objectKey) | ||
} | ||
|
||
// GetAttributes returns the attributes of the specified object key from the configured bucket. | ||
func (o *ObjectClientAdapter) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) { | ||
attr := client.ObjectAttributes{} | ||
thanosAttr, err := o.hedgedBucket.Attributes(ctx, objectKey) | ||
if err != nil { | ||
return attr, err | ||
} | ||
|
||
attr.Size = thanosAttr.Size | ||
return attr, nil | ||
} | ||
|
||
// PutObject puts the specified bytes into the configured bucket at the provided key | ||
func (o *ObjectClientAdapter) PutObject(ctx context.Context, objectKey string, object io.Reader) error { | ||
return o.bucket.Upload(ctx, objectKey, object) | ||
} | ||
|
||
// GetObject returns a reader and the size for the specified object key from the configured bucket. | ||
// size is set to -1 if it cannot be succefully determined, it is up to the caller to check this value before using it. | ||
func (o *ObjectClientAdapter) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) { | ||
reader, err := o.hedgedBucket.Get(ctx, objectKey) | ||
if err != nil { | ||
return nil, 0, err | ||
} | ||
|
||
size, err := objstore.TryToGetSize(reader) | ||
if err != nil { | ||
size = -1 | ||
level.Warn(o.logger).Log("msg", "failed to get size of object", "err", err) | ||
} | ||
|
||
return reader, size, err | ||
} | ||
|
||
func (o *ObjectClientAdapter) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) { | ||
return o.hedgedBucket.GetRange(ctx, objectKey, offset, length) | ||
} | ||
|
||
// List objects with given prefix. | ||
func (o *ObjectClientAdapter) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { | ||
var storageObjects []client.StorageObject | ||
var commonPrefixes []client.StorageCommonPrefix | ||
var iterParams []objstore.IterOption | ||
|
||
// If delimiter is empty we want to list all files | ||
if delimiter == "" { | ||
iterParams = append(iterParams, objstore.WithRecursiveIter) | ||
} | ||
|
||
err := o.bucket.Iter(ctx, prefix, func(objectKey string) error { | ||
// CommonPrefixes are keys that have the prefix and have the delimiter | ||
// as a suffix | ||
if delimiter != "" && strings.HasSuffix(objectKey, delimiter) { | ||
commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(objectKey)) | ||
return nil | ||
} | ||
|
||
// TODO: remove this once thanos support IterWithAttributes | ||
attr, err := o.bucket.Attributes(ctx, objectKey) | ||
if err != nil { | ||
return errors.Wrapf(err, "failed to get attributes for %s", objectKey) | ||
} | ||
|
||
storageObjects = append(storageObjects, client.StorageObject{ | ||
Key: objectKey, | ||
ModifiedAt: attr.LastModified, | ||
}) | ||
|
||
return nil | ||
}, iterParams...) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
return storageObjects, commonPrefixes, nil | ||
} | ||
|
||
// DeleteObject deletes the specified object key from the configured bucket. | ||
func (o *ObjectClientAdapter) DeleteObject(ctx context.Context, objectKey string) error { | ||
return o.bucket.Delete(ctx, objectKey) | ||
} | ||
|
||
// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations. | ||
func (o *ObjectClientAdapter) IsObjectNotFoundErr(err error) bool { | ||
return o.bucket.IsObjNotFoundErr(err) | ||
} | ||
|
||
// IsRetryableErr returns true if the request failed due to some retryable server-side scenario | ||
func (o *ObjectClientAdapter) IsRetryableErr(err error) bool { | ||
return o.isRetryableErr(err) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder why the linter did not catch this empty newline. Please run
make format