diff --git a/pkg/storage/bucket/azure/bucket_client.go b/pkg/storage/bucket/azure/bucket_client.go index c7d1e580bcca2..0cd5e6b3bacff 100644 --- a/pkg/storage/bucket/azure/bucket_client.go +++ b/pkg/storage/bucket/azure/bucket_client.go @@ -1,39 +1,37 @@ package azure import ( + "net/http" + "github.com/go-kit/log" - "github.com/prometheus/common/model" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/azure" - yaml "gopkg.in/yaml.v2" ) func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) { - bucketConfig := azure.Config{ - StorageAccountName: cfg.StorageAccountName, - StorageAccountKey: cfg.StorageAccountKey.String(), - StorageConnectionString: cfg.ConnectionString.String(), - ContainerName: cfg.ContainerName, - Endpoint: cfg.EndpointSuffix, - MaxRetries: cfg.MaxRetries, - HTTPConfig: azure.HTTPConfig{ - IdleConnTimeout: model.Duration(cfg.IdleConnTimeout), - ResponseHeaderTimeout: model.Duration(cfg.ResponseHeaderTimeout), - InsecureSkipVerify: cfg.InsecureSkipVerify, - TLSHandshakeTimeout: model.Duration(cfg.TLSHandshakeTimeout), - ExpectContinueTimeout: model.Duration(cfg.ExpectContinueTimeout), - MaxIdleConns: cfg.MaxIdleConns, - MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost, - MaxConnsPerHost: cfg.MaxConnsPerHost, - }, + return newBucketClient(cfg, name, logger, azure.NewBucketWithConfig) +} + +func newBucketClient(cfg Config, name string, logger log.Logger, factory func(log.Logger, azure.Config, string, http.RoundTripper) (*azure.Bucket, error)) (objstore.Bucket, error) { + // Start with default config to make sure that all parameters are set to sensible values, especially + // HTTP Config field. + bucketConfig := azure.DefaultConfig + bucketConfig.StorageAccountName = cfg.StorageAccountName + bucketConfig.StorageAccountKey = cfg.StorageAccountKey.String() + bucketConfig.StorageConnectionString = cfg.StorageConnectionString.String() + bucketConfig.ContainerName = cfg.ContainerName + bucketConfig.MaxRetries = cfg.MaxRetries + bucketConfig.UserAssignedID = cfg.UserAssignedID + + if cfg.Endpoint != "" { + // azure.DefaultConfig has the default Endpoint, overwrite it only if a different one was explicitly provided. + bucketConfig.Endpoint = cfg.Endpoint } - // Thanos currently doesn't support passing the config as is, but expects a YAML, - // so we're going to serialize it. - serialized, err := yaml.Marshal(bucketConfig) - if err != nil { - return nil, err + var rt http.RoundTripper + if cfg.Transport != nil { + rt = cfg.Transport } - return azure.NewBucket(logger, serialized, name, nil) + return factory(logger, bucketConfig, name, rt) } diff --git a/pkg/storage/bucket/azure/config.go b/pkg/storage/bucket/azure/config.go index 928503190d931..ac8037b6b7819 100644 --- a/pkg/storage/bucket/azure/config.go +++ b/pkg/storage/bucket/azure/config.go @@ -2,22 +2,23 @@ package azure import ( "flag" + "net/http" "github.com/grafana/dskit/flagext" - - "github.com/grafana/loki/v3/pkg/storage/bucket/http" ) // Config holds the config options for an Azure backend type Config struct { - StorageAccountName string `yaml:"account_name"` - StorageAccountKey flagext.Secret `yaml:"account_key"` - ConnectionString flagext.Secret `yaml:"connection_string"` - ContainerName string `yaml:"container_name"` - EndpointSuffix string `yaml:"endpoint_suffix"` - MaxRetries int `yaml:"max_retries"` + StorageAccountName string `yaml:"account_name"` + StorageAccountKey flagext.Secret `yaml:"account_key"` + StorageConnectionString flagext.Secret `yaml:"connection_string"` + ContainerName string `yaml:"container_name"` + Endpoint string `yaml:"endpoint_suffix"` + MaxRetries int `yaml:"max_retries"` + UserAssignedID string `yaml:"user_assigned_id"` - http.Config `yaml:"http"` + // Allow upstream callers to inject a round tripper + Transport http.RoundTripper `yaml:"-"` } // RegisterFlags registers the flags for Azure storage @@ -28,10 +29,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // RegisterFlagsWithPrefix registers the flags for Azure storage func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.StorageAccountName, prefix+"azure.account-name", "", "Azure storage account name") - f.Var(&cfg.StorageAccountKey, prefix+"azure.account-key", "Azure storage account key") - f.Var(&cfg.ConnectionString, prefix+"azure.connection-string", "If `connection-string` is set, the values of `account-name` and `endpoint-suffix` values will not be used. Use this method over `account-key` if you need to authenticate via a SAS token. Or if you use the Azurite emulator.") - f.StringVar(&cfg.ContainerName, prefix+"azure.container-name", "loki", "Azure storage container name") - f.StringVar(&cfg.EndpointSuffix, prefix+"azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The account name will be prefixed to this value to create the FQDN") + f.Var(&cfg.StorageAccountKey, prefix+"azure.account-key", "Azure storage account key. If unset, Azure managed identities will be used for authentication instead.") + f.Var(&cfg.StorageConnectionString, prefix+"azure.connection-string", "If `connection-string` is set, the value of `endpoint-suffix` will not be used. Use this method over `account-key` if you need to authenticate via a SAS token. Or if you use the Azurite emulator.") + f.StringVar(&cfg.ContainerName, prefix+"azure.container-name", "", "Azure storage container name") + f.StringVar(&cfg.Endpoint, prefix+"azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The account name will be prefixed to this value to create the FQDN. If set to empty string, default endpoint suffix is used.") f.IntVar(&cfg.MaxRetries, prefix+"azure.max-retries", 20, "Number of retries for recoverable errors") - cfg.Config.RegisterFlagsWithPrefix(prefix+"azure.", f) + f.StringVar(&cfg.UserAssignedID, prefix+"azure.user-assigned-id", "", "User assigned managed identity. If empty, then System assigned identity is used.") } diff --git a/pkg/storage/bucket/azure/config_test.go b/pkg/storage/bucket/azure/config_test.go deleted file mode 100644 index 82357faa147e4..0000000000000 --- a/pkg/storage/bucket/azure/config_test.go +++ /dev/null @@ -1,98 +0,0 @@ -package azure - -import ( - "testing" - "time" - - "github.com/grafana/dskit/flagext" - "github.com/stretchr/testify/require" - yaml "gopkg.in/yaml.v2" - - "github.com/grafana/loki/v3/pkg/storage/bucket/http" -) - -// defaultConfig should match the default flag values defined in RegisterFlagsWithPrefix. -var defaultConfig = Config{ - ContainerName: "loki", - MaxRetries: 20, - Config: http.Config{ - IdleConnTimeout: 90 * time.Second, - ResponseHeaderTimeout: 2 * time.Minute, - InsecureSkipVerify: false, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - MaxIdleConns: 100, - MaxIdleConnsPerHost: 100, - MaxConnsPerHost: 0, - }, -} - -func TestConfig(t *testing.T) { - t.Parallel() - - tests := map[string]struct { - config string - expectedConfig Config - expectedErr error - }{ - "default config": { - config: "", - expectedConfig: defaultConfig, - expectedErr: nil, - }, - "custom config": { - config: ` -account_name: test-account-name -account_key: test-account-key -connection_string: test-connection-string -container_name: test-container-name -endpoint_suffix: test-endpoint-suffix -max_retries: 1 -http: - idle_conn_timeout: 2s - response_header_timeout: 3s - insecure_skip_verify: true - tls_handshake_timeout: 4s - expect_continue_timeout: 5s - max_idle_connections: 6 - max_idle_connections_per_host: 7 - max_connections_per_host: 8 -`, - expectedConfig: Config{ - StorageAccountName: "test-account-name", - StorageAccountKey: flagext.SecretWithValue("test-account-key"), - ConnectionString: flagext.SecretWithValue("test-connection-string"), - ContainerName: "test-container-name", - EndpointSuffix: "test-endpoint-suffix", - MaxRetries: 1, - Config: http.Config{ - IdleConnTimeout: 2 * time.Second, - ResponseHeaderTimeout: 3 * time.Second, - InsecureSkipVerify: true, - TLSHandshakeTimeout: 4 * time.Second, - ExpectContinueTimeout: 5 * time.Second, - MaxIdleConns: 6, - MaxIdleConnsPerHost: 7, - MaxConnsPerHost: 8, - }, - }, - expectedErr: nil, - }, - "invalid type": { - config: `max_retries: foo`, - expectedConfig: defaultConfig, - expectedErr: &yaml.TypeError{Errors: []string{"line 1: cannot unmarshal !!str `foo` into int"}}, - }, - } - - for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - cfg := Config{} - flagext.DefaultValues(&cfg) - - err := yaml.Unmarshal([]byte(testData.config), &cfg) - require.Equal(t, testData.expectedErr, err) - require.Equal(t, testData.expectedConfig, cfg) - }) - } -} diff --git a/pkg/storage/bucket/object_client_adapter.go b/pkg/storage/bucket/object_client_adapter.go new file mode 100644 index 0000000000000..094f0ad2ea7ac --- /dev/null +++ b/pkg/storage/bucket/object_client_adapter.go @@ -0,0 +1,150 @@ +package bucket + +import ( + "context" + "io" + "strings" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/thanos-io/objstore" + + "github.com/grafana/loki/v3/pkg/storage/chunk/client" +) + +type ObjectClientAdapter struct { + bucket, hedgedBucket objstore.Bucket + logger log.Logger + isRetryableErr func(err error) bool +} + +func NewObjectClientAdapter(bucket, hedgedBucket objstore.Bucket, logger log.Logger, opts ...ClientOptions) *ObjectClientAdapter { + if hedgedBucket == nil { + hedgedBucket = bucket + } + + o := &ObjectClientAdapter{ + bucket: bucket, + hedgedBucket: hedgedBucket, + logger: log.With(logger, "component", "bucket_to_object_client_adapter"), + // default to no retryable errors. Override with WithRetryableErrFunc + isRetryableErr: func(_ error) bool { + return false + }, + } + + for _, opt := range opts { + opt(o) + } + + return o +} + +type ClientOptions func(*ObjectClientAdapter) + +func WithRetryableErrFunc(f func(err error) bool) ClientOptions { + return func(o *ObjectClientAdapter) { + o.isRetryableErr = f + } +} + +func (o *ObjectClientAdapter) Stop() { +} + +// ObjectExists checks if a given objectKey exists in the bucket +func (o *ObjectClientAdapter) ObjectExists(ctx context.Context, objectKey string) (bool, error) { + return o.bucket.Exists(ctx, objectKey) +} + +// GetAttributes returns the attributes of the specified object key from the configured bucket. +func (o *ObjectClientAdapter) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) { + attr := client.ObjectAttributes{} + thanosAttr, err := o.hedgedBucket.Attributes(ctx, objectKey) + if err != nil { + return attr, err + } + + attr.Size = thanosAttr.Size + return attr, nil +} + +// PutObject puts the specified bytes into the configured bucket at the provided key +func (o *ObjectClientAdapter) PutObject(ctx context.Context, objectKey string, object io.Reader) error { + return o.bucket.Upload(ctx, objectKey, object) +} + +// GetObject returns a reader and the size for the specified object key from the configured bucket. +// size is set to -1 if it cannot be succefully determined, it is up to the caller to check this value before using it. +func (o *ObjectClientAdapter) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) { + reader, err := o.hedgedBucket.Get(ctx, objectKey) + if err != nil { + return nil, 0, err + } + + size, err := objstore.TryToGetSize(reader) + if err != nil { + size = -1 + level.Warn(o.logger).Log("msg", "failed to get size of object", "err", err) + } + + return reader, size, err +} + +func (o *ObjectClientAdapter) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) { + return o.hedgedBucket.GetRange(ctx, objectKey, offset, length) +} + +// List objects with given prefix. +func (o *ObjectClientAdapter) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { + var storageObjects []client.StorageObject + var commonPrefixes []client.StorageCommonPrefix + var iterParams []objstore.IterOption + + // If delimiter is empty we want to list all files + if delimiter == "" { + iterParams = append(iterParams, objstore.WithRecursiveIter) + } + + err := o.bucket.Iter(ctx, prefix, func(objectKey string) error { + // CommonPrefixes are keys that have the prefix and have the delimiter + // as a suffix + if delimiter != "" && strings.HasSuffix(objectKey, delimiter) { + commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(objectKey)) + return nil + } + + // TODO: remove this once thanos support IterWithAttributes + attr, err := o.bucket.Attributes(ctx, objectKey) + if err != nil { + return errors.Wrapf(err, "failed to get attributes for %s", objectKey) + } + + storageObjects = append(storageObjects, client.StorageObject{ + Key: objectKey, + ModifiedAt: attr.LastModified, + }) + + return nil + }, iterParams...) + if err != nil { + return nil, nil, err + } + + return storageObjects, commonPrefixes, nil +} + +// DeleteObject deletes the specified object key from the configured bucket. +func (o *ObjectClientAdapter) DeleteObject(ctx context.Context, objectKey string) error { + return o.bucket.Delete(ctx, objectKey) +} + +// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations. +func (o *ObjectClientAdapter) IsObjectNotFoundErr(err error) bool { + return o.bucket.IsObjNotFoundErr(err) +} + +// IsRetryableErr returns true if the request failed due to some retryable server-side scenario +func (o *ObjectClientAdapter) IsRetryableErr(err error) bool { + return o.isRetryableErr(err) +} diff --git a/pkg/storage/chunk/client/gcp/gcs_thanos_object_client_test.go b/pkg/storage/bucket/object_client_adapter_test.go similarity index 92% rename from pkg/storage/chunk/client/gcp/gcs_thanos_object_client_test.go rename to pkg/storage/bucket/object_client_adapter_test.go index d8b824a6e5d04..1ce6de26856bf 100644 --- a/pkg/storage/chunk/client/gcp/gcs_thanos_object_client_test.go +++ b/pkg/storage/bucket/object_client_adapter_test.go @@ -1,4 +1,4 @@ -package gcp +package bucket import ( "bytes" @@ -12,7 +12,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk/client" ) -func TestGCSThanosObjStore_List(t *testing.T) { +func TestObjectClientAdapter_List(t *testing.T) { tests := []struct { name string prefix string @@ -95,10 +95,10 @@ func TestGCSThanosObjStore_List(t *testing.T) { require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/b", buff)) require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/c", buff)) - gcpClient := &GCSThanosObjectClient{} - gcpClient.client = newBucket + client := NewObjectClientAdapter(newBucket, nil, nil) + client.bucket = newBucket - storageObj, storageCommonPref, err := gcpClient.List(context.Background(), tt.prefix, tt.delimiter) + storageObj, storageCommonPref, err := client.List(context.Background(), tt.prefix, tt.delimiter) if tt.wantErr != nil { require.Equal(t, tt.wantErr.Error(), err.Error()) continue diff --git a/pkg/storage/chunk/client/azure/blob_storage_thanos_object_client.go b/pkg/storage/chunk/client/azure/blob_storage_thanos_object_client.go new file mode 100644 index 0000000000000..4bf2137433064 --- /dev/null +++ b/pkg/storage/chunk/client/azure/blob_storage_thanos_object_client.go @@ -0,0 +1,44 @@ +package azure + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" + + "github.com/grafana/loki/v3/pkg/storage/bucket" + "github.com/grafana/loki/v3/pkg/storage/chunk/client" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" +) + +// NewBlobStorageObjectClient makes a new BlobStorage-backed ObjectClient. +func NewBlobStorageThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config) (client.ObjectClient, error) { + b, err := newBlobStorageThanosObjClient(ctx, cfg, component, logger, false, hedgingCfg) + if err != nil { + return nil, err + } + + var hedged objstore.Bucket + if hedgingCfg.At != 0 { + hedged, err = newBlobStorageThanosObjClient(ctx, cfg, component, logger, true, hedgingCfg) + if err != nil { + return nil, err + } + } + + return bucket.NewObjectClientAdapter(b, hedged, logger), nil +} + +func newBlobStorageThanosObjClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config) (objstore.Bucket, error) { + if hedging { + hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer)) + if err != nil { + return nil, err + } + + cfg.Azure.Transport = hedgedTrasport + } + + return bucket.NewClient(ctx, bucket.Azure, cfg, component, logger) +} diff --git a/pkg/storage/chunk/client/gcp/gcs_object_client.go b/pkg/storage/chunk/client/gcp/gcs_object_client.go index 9b05b57404c49..1d44659b3f3cc 100644 --- a/pkg/storage/chunk/client/gcp/gcs_object_client.go +++ b/pkg/storage/chunk/client/gcp/gcs_object_client.go @@ -279,7 +279,7 @@ func isContextErr(err error) bool { } // IsStorageTimeoutErr returns true if error means that object cannot be retrieved right now due to server-side timeouts. -func (s *GCSObjectClient) IsStorageTimeoutErr(err error) bool { +func IsStorageTimeoutErr(err error) bool { // TODO(dannyk): move these out to be generic // context errors are all client-side if isContextErr(err) { @@ -315,7 +315,7 @@ func (s *GCSObjectClient) IsStorageTimeoutErr(err error) bool { } // IsStorageThrottledErr returns true if error means that object cannot be retrieved right now due to throttling. -func (s *GCSObjectClient) IsStorageThrottledErr(err error) bool { +func IsStorageThrottledErr(err error) bool { if gerr, ok := err.(*googleapi.Error); ok { // https://cloud.google.com/storage/docs/retry-strategy return gerr.Code == http.StatusTooManyRequests || @@ -325,9 +325,14 @@ func (s *GCSObjectClient) IsStorageThrottledErr(err error) bool { return false } +// IsRetryableErr returns true if the request failed due to some retryable server-side scenario +func IsRetryableErr(err error) bool { + return IsStorageTimeoutErr(err) || IsStorageThrottledErr(err) +} + // IsRetryableErr returns true if the request failed due to some retryable server-side scenario func (s *GCSObjectClient) IsRetryableErr(err error) bool { - return s.IsStorageTimeoutErr(err) || s.IsStorageThrottledErr(err) + return IsRetryableErr(err) } func gcsTransport(ctx context.Context, scope string, insecure bool, http2 bool, serviceAccount flagext.Secret) (http.RoundTripper, error) { diff --git a/pkg/storage/chunk/client/gcp/gcs_object_client_test.go b/pkg/storage/chunk/client/gcp/gcs_object_client_test.go index c885c4c1d780c..a0e6313f7ce43 100644 --- a/pkg/storage/chunk/client/gcp/gcs_object_client_test.go +++ b/pkg/storage/chunk/client/gcp/gcs_object_client_test.go @@ -147,8 +147,8 @@ func TestUpstreamRetryableErrs(t *testing.T) { require.NoError(t, err) _, _, err = cli.GetObject(ctx, "foo") - require.Equal(t, tc.isThrottledErr, cli.IsStorageThrottledErr(err)) - require.Equal(t, tc.isTimeoutErr, cli.IsStorageTimeoutErr(err)) + require.Equal(t, tc.isThrottledErr, IsStorageThrottledErr(err)) + require.Equal(t, tc.isTimeoutErr, IsStorageTimeoutErr(err)) }) } } @@ -229,7 +229,7 @@ func TestTCPErrs(t *testing.T) { _, _, err = cli.GetObject(ctx, "foo") require.Error(t, err) - require.Equal(t, tc.retryable, cli.IsStorageTimeoutErr(err)) + require.Equal(t, tc.retryable, IsStorageTimeoutErr(err)) }) } } diff --git a/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go b/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go index af0ae55b82cc2..b4190be2d6943 100644 --- a/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go +++ b/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go @@ -2,54 +2,32 @@ package gcp import ( "context" - "io" - "net" - "net/http" - "strings" "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" - "google.golang.org/api/googleapi" - amnet "k8s.io/apimachinery/pkg/util/net" "github.com/grafana/loki/v3/pkg/storage/bucket" "github.com/grafana/loki/v3/pkg/storage/chunk/client" "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" ) -type GCSThanosObjectClient struct { - client objstore.Bucket - hedgedClient objstore.Bucket - logger log.Logger -} - -func NewGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config) (*GCSThanosObjectClient, error) { - client, err := newGCSThanosObjectClient(ctx, cfg, component, logger, false, hedgingCfg) +func NewGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config) (client.ObjectClient, error) { + b, err := newGCSThanosObjectClient(ctx, cfg, component, logger, false, hedgingCfg) if err != nil { return nil, err } - if hedgingCfg.At == 0 { - return &GCSThanosObjectClient{ - client: client, - hedgedClient: client, - logger: logger, - }, nil - } - - hedgedClient, err := newGCSThanosObjectClient(ctx, cfg, component, logger, true, hedgingCfg) - if err != nil { - return nil, err + var hedged objstore.Bucket + if hedgingCfg.At != 0 { + hedged, err = newGCSThanosObjectClient(ctx, cfg, component, logger, true, hedgingCfg) + if err != nil { + return nil, err + } } - return &GCSThanosObjectClient{ - client: client, - hedgedClient: hedgedClient, - logger: logger, - }, nil + o := bucket.NewObjectClientAdapter(b, hedged, logger, bucket.WithRetryableErrFunc(IsRetryableErr)) + return o, nil } func newGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config) (objstore.Bucket, error) { @@ -64,150 +42,3 @@ func newGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component return bucket.NewClient(ctx, bucket.GCS, cfg, component, logger) } - -func (s *GCSThanosObjectClient) Stop() { -} - -// ObjectExists checks if a given objectKey exists in the GCS bucket -func (s *GCSThanosObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) { - return s.client.Exists(ctx, objectKey) -} - -// GetAttributes returns the attributes of the specified object key from the configured GCS bucket. -func (s *GCSThanosObjectClient) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) { - attr := client.ObjectAttributes{} - thanosAttr, err := s.hedgedClient.Attributes(ctx, objectKey) - if err != nil { - return attr, err - } - - attr.Size = thanosAttr.Size - return attr, nil -} - -// PutObject puts the specified bytes into the configured GCS bucket at the provided key -func (s *GCSThanosObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error { - return s.client.Upload(ctx, objectKey, object) -} - -// GetObject returns a reader and the size for the specified object key from the configured GCS bucket. -// size is set to -1 if it cannot be succefully determined, it is up to the caller to check this value before using it. -func (s *GCSThanosObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) { - reader, err := s.hedgedClient.Get(ctx, objectKey) - if err != nil { - return nil, 0, err - } - - size, err := objstore.TryToGetSize(reader) - if err != nil { - size = -1 - level.Warn(s.logger).Log("msg", "failed to get size of object", "err", err) - } - - return reader, size, err -} - -func (s *GCSThanosObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) { - return s.hedgedClient.GetRange(ctx, objectKey, offset, length) -} - -// List objects with given prefix. -func (s *GCSThanosObjectClient) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { - var storageObjects []client.StorageObject - var commonPrefixes []client.StorageCommonPrefix - var iterParams []objstore.IterOption - - // If delimiter is empty we want to list all files - if delimiter == "" { - iterParams = append(iterParams, objstore.WithRecursiveIter) - } - - err := s.client.Iter(ctx, prefix, func(objectKey string) error { - // CommonPrefixes are keys that have the prefix and have the delimiter - // as a suffix - if delimiter != "" && strings.HasSuffix(objectKey, delimiter) { - commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(objectKey)) - return nil - } - - // TODO: remove this once thanos support IterWithAttributes - attr, err := s.client.Attributes(ctx, objectKey) - if err != nil { - return errors.Wrapf(err, "failed to get attributes for %s", objectKey) - } - - storageObjects = append(storageObjects, client.StorageObject{ - Key: objectKey, - ModifiedAt: attr.LastModified, - }) - - return nil - }, iterParams...) - if err != nil { - return nil, nil, err - } - - return storageObjects, commonPrefixes, nil -} - -// DeleteObject deletes the specified object key from the configured GCS bucket. -func (s *GCSThanosObjectClient) DeleteObject(ctx context.Context, objectKey string) error { - return s.client.Delete(ctx, objectKey) -} - -// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations. -func (s *GCSThanosObjectClient) IsObjectNotFoundErr(err error) bool { - return s.client.IsObjNotFoundErr(err) -} - -// IsStorageTimeoutErr returns true if error means that object cannot be retrieved right now due to server-side timeouts. -func (s *GCSThanosObjectClient) IsStorageTimeoutErr(err error) bool { - // TODO(dannyk): move these out to be generic - // context errors are all client-side - if isContextErr(err) { - // Go 1.23 changed the type of the error returned by the http client when a timeout occurs - // while waiting for headers. This is a server side timeout. - return strings.Contains(err.Error(), "Client.Timeout") - } - - // connection misconfiguration, or writing on a closed connection - // do NOT retry; this is not a server-side issue - if errors.Is(err, net.ErrClosed) || amnet.IsConnectionRefused(err) { - return false - } - - // this is a server-side timeout - if isTimeoutError(err) { - return true - } - - // connection closed (closed before established) or reset (closed after established) - // this is a server-side issue - if errors.Is(err, io.EOF) || amnet.IsConnectionReset(err) { - return true - } - - if gerr, ok := err.(*googleapi.Error); ok { - // https://cloud.google.com/storage/docs/retry-strategy - return gerr.Code == http.StatusRequestTimeout || - gerr.Code == http.StatusGatewayTimeout - } - - return false -} - -// IsStorageThrottledErr returns true if error means that object cannot be retrieved right now due to throttling. -func (s *GCSThanosObjectClient) IsStorageThrottledErr(err error) bool { - if gerr, ok := err.(*googleapi.Error); ok { - // https://cloud.google.com/storage/docs/retry-strategy - return gerr.Code == http.StatusTooManyRequests || - (gerr.Code/100 == 5) // all 5xx errors are retryable - } - - return false -} - -// IsRetryableErr returns true if the request failed due to some retryable server-side scenario -func (s *GCSThanosObjectClient) IsRetryableErr(err error) bool { - return s.IsStorageTimeoutErr(err) || s.IsStorageThrottledErr(err) -} diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index 69a7693fbe83e..79135abd26d00 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -697,6 +697,9 @@ func internalNewObjectClient(storeName, component string, cfg Config, clientMetr } azureCfg = (azure.BlobStorageConfig)(nsCfg) } + if cfg.UseThanosObjstore { + return azure.NewBlobStorageThanosObjectClient(context.Background(), cfg.ObjectStore, component, util_log.Logger, cfg.Hedging) + } return azure.NewBlobStorage(&azureCfg, clientMetrics.AzureMetrics, cfg.Hedging) case types.StorageTypeSwift: