From 2427fab32d42117b37f7f72250ab5491628c134f Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Thu, 21 Oct 2021 11:58:08 -0600 Subject: [PATCH] Configuration: add a common config section for object storage (#4473) * add common config for object storage Signed-off-by: Trevor Whitney * wip Signed-off-by: Trevor Whitney * add test to compactor config override Signed-off-by: Trevor Whitney * update changelog Signed-off-by: Trevor Whitney * changes from PR review * rename object_store -> storage * add local/filesytem support to common object storage config --- CHANGELOG.md | 1 + pkg/loki/common/common.go | 19 +- pkg/loki/config_wrapper.go | 97 ++++ pkg/loki/config_wrapper_test.go | 439 +++++++++++++++++- pkg/storage/chunk/aws/s3_storage_client.go | 27 ++ .../chunk/azure/blob_storage_client.go | 17 + pkg/storage/chunk/gcp/gcs_object_client.go | 10 + pkg/storage/chunk/local/fs_object_client.go | 7 + .../chunk/openstack/swift_object_client.go | 7 + 9 files changed, 619 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c00a49f831d7..c6631fa6bb508 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [4440](https://github.com/grafana/loki/pull/4440) **DylanGuedes**: Config: Override distributor's default ring KV store * [4443](https://github.com/grafana/loki/pull/4443) **DylanGuedes**: Loki: Change how push API checks for contentType * [4415](https://github.com/grafana/loki/pull/4415) **DylanGuedes**: Change default limits to common values +* [4473](https://github.com/grafana/loki/pull/4473) **trevorwhitney**: Config: add object storage configuration to common config # 2.3.0 (2021/08/06) diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index cce66321362cf..c0e11bf2f2abe 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -1,6 +1,23 @@ package common +import ( + "github.com/grafana/loki/pkg/storage/chunk/aws" + "github.com/grafana/loki/pkg/storage/chunk/azure" + "github.com/grafana/loki/pkg/storage/chunk/gcp" + "github.com/grafana/loki/pkg/storage/chunk/local" + "github.com/grafana/loki/pkg/storage/chunk/openstack" +) + // Config holds common config that can be shared between multiple other config sections type Config struct { - PathPrefix string `yaml:"path_prefix"` + PathPrefix string `yaml:"path_prefix"` + Storage Storage `yaml:"storage"` +} + +type Storage struct { + S3 *aws.S3Config `yaml:"s3"` + GCS *gcp.GCSConfig `yaml:"gcs"` + Azure *azure.BlobStorageConfig `yaml:"azure"` + Swift *openstack.SwiftConfig `yaml:"swift"` + FSConfig *local.FSConfig `yaml:"filesystem"` } diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index d7df5455a475f..ecacd94ce8a52 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -3,10 +3,12 @@ package loki import ( "flag" "fmt" + "reflect" "github.com/grafana/dskit/flagext" "github.com/pkg/errors" + "github.com/grafana/loki/pkg/storage/chunk/storage" "github.com/grafana/loki/pkg/util/cfg" ) @@ -72,6 +74,7 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { } applyMemberlistConfig(r) + applyStorageConfig(r, &defaults) return nil } @@ -89,3 +92,97 @@ func applyMemberlistConfig(r *ConfigWrapper) { r.Ruler.Ring.KVStore.Store = memberlistStr } } + +// applyStorageConfig will attempt to apply a common storage config for either +// s3, gcs, azure, or swift to all the places we create an object storage client. +// If any specific configs for an object storage client have been provided elsewhere in the +// configuration file, applyStorageConfig will not override them. +// If multiple storage configurations are provided, applyStorageConfig will apply +// all of them, and will set the value for the Ruler's StoreConfig `type` to the +// last one (alphabetically) that was defined. +func applyStorageConfig(cfg, defaults *ConfigWrapper) { + rulerStoreConfigsToApply := make([]func(*ConfigWrapper), 0, 4) + chunkStorageConfigsToApply := make([]func(*ConfigWrapper), 0, 4) + + if cfg.Common.Storage.Azure != nil { + rulerStoreConfigsToApply = append(rulerStoreConfigsToApply, func(r *ConfigWrapper) { + r.Ruler.StoreConfig.Type = "azure" + r.Ruler.StoreConfig.Azure = r.Common.Storage.Azure.ToCortexAzureConfig() + }) + + chunkStorageConfigsToApply = append(chunkStorageConfigsToApply, func(r *ConfigWrapper) { + r.StorageConfig.AzureStorageConfig = *r.Common.Storage.Azure + r.CompactorConfig.SharedStoreType = storage.StorageTypeAzure + }) + } + + if cfg.Common.Storage.GCS != nil { + rulerStoreConfigsToApply = append(rulerStoreConfigsToApply, func(r *ConfigWrapper) { + r.Ruler.StoreConfig.Type = "gcs" + r.Ruler.StoreConfig.GCS = r.Common.Storage.GCS.ToCortexGCSConfig() + }) + + chunkStorageConfigsToApply = append(chunkStorageConfigsToApply, func(r *ConfigWrapper) { + r.StorageConfig.GCSConfig = *r.Common.Storage.GCS + r.CompactorConfig.SharedStoreType = storage.StorageTypeGCS + }) + } + + if cfg.Common.Storage.FSConfig != nil { + rulerStoreConfigsToApply = append(rulerStoreConfigsToApply, func(r *ConfigWrapper) { + r.Ruler.StoreConfig.Type = "local" + r.Ruler.StoreConfig.Local = r.Common.Storage.FSConfig.ToCortexLocalConfig() + }) + + chunkStorageConfigsToApply = append(chunkStorageConfigsToApply, func(r *ConfigWrapper) { + r.StorageConfig.FSConfig = *r.Common.Storage.FSConfig + r.CompactorConfig.SharedStoreType = storage.StorageTypeFileSystem + }) + } + + if cfg.Common.Storage.S3 != nil { + rulerStoreConfigsToApply = append(rulerStoreConfigsToApply, func(r *ConfigWrapper) { + r.Ruler.StoreConfig.Type = "s3" + r.Ruler.StoreConfig.S3 = r.Common.Storage.S3.ToCortexS3Config() + }) + + chunkStorageConfigsToApply = append(chunkStorageConfigsToApply, func(r *ConfigWrapper) { + r.StorageConfig.AWSStorageConfig.S3Config = *r.Common.Storage.S3 + r.CompactorConfig.SharedStoreType = storage.StorageTypeS3 + }) + } + + if cfg.Common.Storage.Swift != nil { + rulerStoreConfigsToApply = append(rulerStoreConfigsToApply, func(r *ConfigWrapper) { + r.Ruler.StoreConfig.Type = "swift" + r.Ruler.StoreConfig.Swift = r.Common.Storage.Swift.ToCortexSwiftConfig() + }) + + chunkStorageConfigsToApply = append(chunkStorageConfigsToApply, func(r *ConfigWrapper) { + r.StorageConfig.Swift = *r.Common.Storage.Swift + r.CompactorConfig.SharedStoreType = storage.StorageTypeSwift + }) + } + + // store change funcs in slices and apply all at once, because once we change the + // config we can no longer compare it to the default, this allows us to only + // do that comparison once + applyRulerStoreConfigs(cfg, defaults, rulerStoreConfigsToApply) + applyChunkStorageConfigs(cfg, defaults, chunkStorageConfigsToApply) +} + +func applyRulerStoreConfigs(cfg, defaults *ConfigWrapper, apply []func(*ConfigWrapper)) { + if reflect.DeepEqual(cfg.Ruler.StoreConfig, defaults.Ruler.StoreConfig) { + for _, ap := range apply { + ap(cfg) + } + } +} + +func applyChunkStorageConfigs(cfg, defaults *ConfigWrapper, apply []func(*ConfigWrapper)) { + if reflect.DeepEqual(cfg.StorageConfig, defaults.StorageConfig) { + for _, ap := range apply { + ap(cfg) + } + } +} diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index ffaa23039f45a..a8f458ae3eee7 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -3,12 +3,21 @@ package loki import ( "flag" "io/ioutil" + "net/url" "os" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + cortex_aws "github.com/cortexproject/cortex/pkg/chunk/aws" + cortex_azure "github.com/cortexproject/cortex/pkg/chunk/azure" + cortex_gcp "github.com/cortexproject/cortex/pkg/chunk/gcp" + cortex_local "github.com/cortexproject/cortex/pkg/ruler/rulestore/local" + cortex_swift "github.com/cortexproject/cortex/pkg/storage/bucket/swift" + + "github.com/grafana/loki/pkg/storage/chunk/storage" "github.com/grafana/loki/pkg/util/cfg" ) @@ -43,10 +52,15 @@ func Test_CommonConfig(t *testing.T) { return config, defaults } + //the unmarshaller overwrites default values with 0s when a completely empty + //config file is passed, so our "empty" config has some non-relevant config in it + const emptyConfigString = `--- +server: + http_listen_port: 80` + t.Run("common path prefix config", func(t *testing.T) { t.Run("does not override defaults for file paths when not provided", func(t *testing.T) { - configFileString := `---` - config, defaults := testContext(configFileString, nil) + config, defaults := testContext(emptyConfigString, nil) assert.EqualValues(t, defaults.Ruler.RulePath, config.Ruler.RulePath) assert.EqualValues(t, defaults.Ingester.WAL.Dir, config.Ingester.WAL.Dir) @@ -92,8 +106,7 @@ common: // * ruler t.Run("does not automatically configure memberlist when no top-level memberlist config is provided", func(t *testing.T) { - configFileString := `---` - config, defaults := testContext(configFileString, nil) + config, defaults := testContext(emptyConfigString, nil) assert.EqualValues(t, defaults.Ingester.LifecyclerConfig.RingConfig.KVStore.Store, config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) assert.EqualValues(t, defaults.Distributor.DistributorRing.KVStore.Store, config.Distributor.DistributorRing.KVStore.Store) @@ -145,6 +158,424 @@ memberlist: assert.EqualValues(t, memberlistStr, config.Distributor.DistributorRing.KVStore.Store) }) }) + + t.Run("common object store config", func(t *testing.T) { + //config file structure + //common: + // storage: + // azure: azure.BlobStorageConfig + // gcs: gcp.GCSConfig + // s3: aws.S3Config + // swift: openstack.SwiftConfig + + t.Run("does not automatically configure cloud object storage", func(t *testing.T) { + config, defaults := testContext(emptyConfigString, nil) + + assert.EqualValues(t, defaults.Ruler.StoreConfig.Type, config.Ruler.StoreConfig.Type) + assert.EqualValues(t, defaults.Ruler.StoreConfig.Azure, config.Ruler.StoreConfig.Azure) + assert.EqualValues(t, defaults.Ruler.StoreConfig.GCS, config.Ruler.StoreConfig.GCS) + assert.EqualValues(t, defaults.Ruler.StoreConfig.S3, config.Ruler.StoreConfig.S3) + assert.EqualValues(t, defaults.Ruler.StoreConfig.Swift, config.Ruler.StoreConfig.Swift) + assert.EqualValues(t, defaults.Ruler.StoreConfig.Local, config.Ruler.StoreConfig.Local) + + assert.EqualValues(t, defaults.StorageConfig.AWSStorageConfig, config.StorageConfig.AWSStorageConfig) + assert.EqualValues(t, defaults.StorageConfig.AzureStorageConfig, config.StorageConfig.AzureStorageConfig) + assert.EqualValues(t, defaults.StorageConfig.GCSConfig, config.StorageConfig.GCSConfig) + assert.EqualValues(t, defaults.StorageConfig.Swift, config.StorageConfig.Swift) + assert.EqualValues(t, defaults.StorageConfig.FSConfig, config.StorageConfig.FSConfig) + }) + + t.Run("when multiple configs are provided, the last (alphabetically) is used as the ruler store type", func(t *testing.T) { + multipleConfig := `common: + storage: + s3: + s3: s3://foo-bucket/example + endpoint: s3://foo-bucket + region: us-east1 + access_key_id: abc123 + secret_access_key: def789 + gcs: + bucket_name: foobar + chunk_buffer_size: 27 + request_timeout: 5m` + + config, _ := testContext(multipleConfig, nil) + assert.Equal(t, "s3", config.Ruler.StoreConfig.Type) + + assert.Equal(t, "s3://foo-bucket", config.Ruler.StoreConfig.S3.Endpoint) + assert.Equal(t, "foobar", config.Ruler.StoreConfig.GCS.BucketName) + + assert.Equal(t, "s3://foo-bucket", config.StorageConfig.AWSStorageConfig.S3Config.Endpoint) + assert.Equal(t, "foobar", config.StorageConfig.GCSConfig.BucketName) + }) + + t.Run("when common s3 storage config is provided, ruler and storage config are defaulted to use it", func(t *testing.T) { + s3Config := `common: + storage: + s3: + s3: s3://foo-bucket/example + endpoint: s3://foo-bucket + region: us-east1 + access_key_id: abc123 + secret_access_key: def789 + insecure: true + signature_version: v4 + http_config: + idle_conn_timeout: 5m + response_header_timeout: 5m` + + config, defaults := testContext(s3Config, nil) + + expected, err := url.Parse("s3://foo-bucket/example") + require.NoError(t, err) + + assert.Equal(t, "s3", config.Ruler.StoreConfig.Type) + + for _, actual := range []cortex_aws.S3Config{ + config.Ruler.StoreConfig.S3, + config.StorageConfig.AWSStorageConfig.S3Config.ToCortexS3Config(), + } { + require.NotNil(t, actual.S3.URL) + assert.Equal(t, *expected, *actual.S3.URL) + + assert.Equal(t, false, actual.S3ForcePathStyle) + assert.Equal(t, "s3://foo-bucket", actual.Endpoint) + assert.Equal(t, "us-east1", actual.Region) + assert.Equal(t, "abc123", actual.AccessKeyID) + assert.Equal(t, "def789", actual.SecretAccessKey) + assert.Equal(t, true, actual.Insecure) + assert.Equal(t, false, actual.SSEEncryption) + assert.Equal(t, 5*time.Minute, actual.HTTPConfig.IdleConnTimeout) + assert.Equal(t, 5*time.Minute, actual.HTTPConfig.ResponseHeaderTimeout) + assert.Equal(t, false, actual.HTTPConfig.InsecureSkipVerify) + assert.Equal(t, "v4", actual.SignatureVersion) + } + + //should remain empty + assert.EqualValues(t, defaults.Ruler.StoreConfig.Azure, config.Ruler.StoreConfig.Azure) + assert.EqualValues(t, defaults.Ruler.StoreConfig.GCS, config.Ruler.StoreConfig.GCS) + assert.EqualValues(t, defaults.Ruler.StoreConfig.Swift, config.Ruler.StoreConfig.Swift) + assert.EqualValues(t, defaults.Ruler.StoreConfig.Local, config.Ruler.StoreConfig.Local) + + //should remain empty + assert.EqualValues(t, defaults.StorageConfig.AzureStorageConfig, config.StorageConfig.AzureStorageConfig) + assert.EqualValues(t, defaults.StorageConfig.GCSConfig, config.StorageConfig.GCSConfig) + assert.EqualValues(t, defaults.StorageConfig.Swift, config.StorageConfig.Swift) + assert.EqualValues(t, defaults.StorageConfig.FSConfig, config.StorageConfig.FSConfig) + }) + + t.Run("when common gcs storage config is provided, ruler and storage config are defaulted to use it", func(t *testing.T) { + gcsConfig := `common: + storage: + gcs: + bucket_name: foobar + chunk_buffer_size: 27 + request_timeout: 5m + enable_opencensus: true` + + config, defaults := testContext(gcsConfig, nil) + + assert.Equal(t, "gcs", config.Ruler.StoreConfig.Type) + + for _, actual := range []cortex_gcp.GCSConfig{ + config.Ruler.StoreConfig.GCS, + config.StorageConfig.GCSConfig.ToCortexGCSConfig(), + } { + assert.Equal(t, "foobar", actual.BucketName) + assert.Equal(t, 27, actual.ChunkBufferSize) + assert.Equal(t, 5*time.Minute, actual.RequestTimeout) + assert.Equal(t, true, actual.EnableOpenCensus) + } + + //should remain empty + assert.EqualValues(t, defaults.Ruler.StoreConfig.Azure, config.Ruler.StoreConfig.Azure) + assert.EqualValues(t, defaults.Ruler.StoreConfig.S3, config.Ruler.StoreConfig.S3) + assert.EqualValues(t, defaults.Ruler.StoreConfig.Swift, config.Ruler.StoreConfig.Swift) + assert.EqualValues(t, defaults.Ruler.StoreConfig.Local, config.Ruler.StoreConfig.Local) + //should remain empty + assert.EqualValues(t, defaults.StorageConfig.AzureStorageConfig, config.StorageConfig.AzureStorageConfig) + assert.EqualValues(t, defaults.StorageConfig.AWSStorageConfig.S3Config, config.StorageConfig.AWSStorageConfig.S3Config) + assert.EqualValues(t, defaults.StorageConfig.Swift, config.StorageConfig.Swift) + assert.EqualValues(t, defaults.StorageConfig.FSConfig, config.StorageConfig.FSConfig) + }) + + t.Run("when common azure storage config is provided, ruler and storage config are defaulted to use it", func(t *testing.T) { + azureConfig := `common: + storage: + azure: + environment: earth + container_name: milkyway + account_name: 3rd_planet + account_key: water + download_buffer_size: 27 + upload_buffer_size: 42 + upload_buffer_count: 13 + request_timeout: 5m + max_retries: 3 + min_retry_delay: 10s + max_retry_delay: 10m` + + config, defaults := testContext(azureConfig, nil) + + assert.Equal(t, "azure", config.Ruler.StoreConfig.Type) + + for _, actual := range []cortex_azure.BlobStorageConfig{ + config.Ruler.StoreConfig.Azure, + config.StorageConfig.AzureStorageConfig.ToCortexAzureConfig(), + } { + assert.Equal(t, "earth", actual.Environment) + assert.Equal(t, "milkyway", actual.ContainerName) + assert.Equal(t, "3rd_planet", actual.AccountName) + assert.Equal(t, "water", actual.AccountKey.Value) + assert.Equal(t, 27, actual.DownloadBufferSize) + assert.Equal(t, 42, actual.UploadBufferSize) + assert.Equal(t, 13, actual.UploadBufferCount) + assert.Equal(t, 5*time.Minute, actual.RequestTimeout) + assert.Equal(t, 3, actual.MaxRetries) + assert.Equal(t, 10*time.Second, actual.MinRetryDelay) + assert.Equal(t, 10*time.Minute, actual.MaxRetryDelay) + } + + //should remain empty + assert.EqualValues(t, defaults.Ruler.StoreConfig.GCS, config.Ruler.StoreConfig.GCS) + assert.EqualValues(t, defaults.Ruler.StoreConfig.S3, config.Ruler.StoreConfig.S3) + assert.EqualValues(t, defaults.Ruler.StoreConfig.Swift, config.Ruler.StoreConfig.Swift) + assert.EqualValues(t, defaults.Ruler.StoreConfig.Local, config.Ruler.StoreConfig.Local) + + //should remain empty + assert.EqualValues(t, defaults.StorageConfig.GCSConfig, config.StorageConfig.GCSConfig) + assert.EqualValues(t, defaults.StorageConfig.AWSStorageConfig.S3Config, config.StorageConfig.AWSStorageConfig.S3Config) + assert.EqualValues(t, defaults.StorageConfig.Swift, config.StorageConfig.Swift) + assert.EqualValues(t, defaults.StorageConfig.FSConfig, config.StorageConfig.FSConfig) + }) + + t.Run("when common swift storage config is provided, ruler and storage config are defaulted to use it", func(t *testing.T) { + swiftConfig := `common: + storage: + swift: + auth_version: 3 + auth_url: http://example.com + username: steve + user_domain_name: example.com + user_domain_id: 1 + user_id: 27 + password: supersecret + domain_id: 2 + domain_name: test.com + project_id: 13 + project_name: tower + project_domain_id: 3 + project_domain_name: tower.com + region_name: us-east1 + container_name: tupperware + max_retries: 6 + connect_timeout: 5m + request_timeout: 5s` + + config, defaults := testContext(swiftConfig, nil) + + assert.Equal(t, "swift", config.Ruler.StoreConfig.Type) + + for _, actual := range []cortex_swift.Config{ + config.Ruler.StoreConfig.Swift.Config, + config.StorageConfig.Swift.Config, + } { + assert.Equal(t, 3, actual.AuthVersion) + assert.Equal(t, "http://example.com", actual.AuthURL) + assert.Equal(t, "steve", actual.Username) + assert.Equal(t, "example.com", actual.UserDomainName) + assert.Equal(t, "1", actual.UserDomainID) + assert.Equal(t, "27", actual.UserID) + assert.Equal(t, "supersecret", actual.Password) + assert.Equal(t, "2", actual.DomainID) + assert.Equal(t, "test.com", actual.DomainName) + assert.Equal(t, "13", actual.ProjectID) + assert.Equal(t, "tower", actual.ProjectName) + assert.Equal(t, "3", actual.ProjectDomainID) + assert.Equal(t, "tower.com", actual.ProjectDomainName) + assert.Equal(t, "us-east1", actual.RegionName) + assert.Equal(t, "tupperware", actual.ContainerName) + assert.Equal(t, 6, actual.MaxRetries) + assert.Equal(t, 5*time.Minute, actual.ConnectTimeout) + assert.Equal(t, 5*time.Second, actual.RequestTimeout) + } + + //should remain empty + assert.EqualValues(t, defaults.Ruler.StoreConfig.GCS, config.Ruler.StoreConfig.GCS) + assert.EqualValues(t, defaults.Ruler.StoreConfig.S3, config.Ruler.StoreConfig.S3) + assert.EqualValues(t, defaults.Ruler.StoreConfig.Azure, config.Ruler.StoreConfig.Azure) + assert.EqualValues(t, defaults.Ruler.StoreConfig.Local, config.Ruler.StoreConfig.Local) + + //should remain empty + assert.EqualValues(t, defaults.StorageConfig.GCSConfig, config.StorageConfig.GCSConfig) + assert.EqualValues(t, defaults.StorageConfig.AWSStorageConfig.S3Config, config.StorageConfig.AWSStorageConfig.S3Config) + assert.EqualValues(t, defaults.StorageConfig.AzureStorageConfig, config.StorageConfig.AzureStorageConfig) + assert.EqualValues(t, defaults.StorageConfig.FSConfig, config.StorageConfig.FSConfig) + }) + + t.Run("when common filesystem/local config is provided, ruler and storage config are defaulted to use it", func(t *testing.T) { + fsConfig := `common: + storage: + filesystem: + directory: /tmp/foo` + + config, defaults := testContext(fsConfig, nil) + + assert.Equal(t, "local", config.Ruler.StoreConfig.Type) + + for _, actual := range []cortex_local.Config{ + config.Ruler.StoreConfig.Local, + config.StorageConfig.FSConfig.ToCortexLocalConfig(), + } { + assert.Equal(t, "/tmp/foo", actual.Directory) + } + + //should remain empty + assert.EqualValues(t, defaults.Ruler.StoreConfig.GCS, config.Ruler.StoreConfig.GCS) + assert.EqualValues(t, defaults.Ruler.StoreConfig.S3, config.Ruler.StoreConfig.S3) + assert.EqualValues(t, defaults.Ruler.StoreConfig.Azure, config.Ruler.StoreConfig.Azure) + assert.EqualValues(t, defaults.Ruler.StoreConfig.Swift, config.Ruler.StoreConfig.Swift) + + //should remain empty + assert.EqualValues(t, defaults.StorageConfig.GCSConfig, config.StorageConfig.GCSConfig) + assert.EqualValues(t, defaults.StorageConfig.AWSStorageConfig.S3Config, config.StorageConfig.AWSStorageConfig.S3Config) + assert.EqualValues(t, defaults.StorageConfig.AzureStorageConfig, config.StorageConfig.AzureStorageConfig) + assert.EqualValues(t, defaults.StorageConfig.Swift, config.StorageConfig.Swift) + }) + + t.Run("explicit ruler storage object storage configuration provided via config file is preserved", func(t *testing.T) { + specificRulerConfig := `common: + storage: + gcs: + bucket_name: foobar + chunk_buffer_size: 27 + request_timeout: 5m +ruler: + storage: + type: s3 + s3: + endpoint: s3://foo-bucket + region: us-east1 + access_key_id: abc123 + secret_access_key: def789` + config, defaults := testContext(specificRulerConfig, nil) + + assert.Equal(t, "s3", config.Ruler.StoreConfig.Type) + assert.Equal(t, "s3://foo-bucket", config.Ruler.StoreConfig.S3.Endpoint) + assert.Equal(t, "us-east1", config.Ruler.StoreConfig.S3.Region) + assert.Equal(t, "abc123", config.Ruler.StoreConfig.S3.AccessKeyID) + assert.Equal(t, "def789", config.Ruler.StoreConfig.S3.SecretAccessKey) + + //should remain empty + assert.EqualValues(t, defaults.Ruler.StoreConfig.GCS, config.Ruler.StoreConfig.GCS) + + //should be set by common config + assert.EqualValues(t, "foobar", config.StorageConfig.GCSConfig.BucketName) + assert.EqualValues(t, 27, config.StorageConfig.GCSConfig.ChunkBufferSize) + assert.EqualValues(t, 5*time.Minute, config.StorageConfig.GCSConfig.RequestTimeout) + + //should remain empty + assert.EqualValues(t, defaults.StorageConfig.AWSStorageConfig.S3Config, config.StorageConfig.AWSStorageConfig.S3Config) + }) + + t.Run("explicit storage config provided via config file is preserved", func(t *testing.T) { + specificRulerConfig := `common: + storage: + gcs: + bucket_name: foobar + chunk_buffer_size: 27 + request_timeout: 5m +storage_config: + aws: + endpoint: s3://foo-bucket + region: us-east1 + access_key_id: abc123 + secret_access_key: def789` + + config, defaults := testContext(specificRulerConfig, nil) + + assert.Equal(t, "s3://foo-bucket", config.StorageConfig.AWSStorageConfig.S3Config.Endpoint) + assert.Equal(t, "us-east1", config.StorageConfig.AWSStorageConfig.S3Config.Region) + assert.Equal(t, "abc123", config.StorageConfig.AWSStorageConfig.S3Config.AccessKeyID) + assert.Equal(t, "def789", config.StorageConfig.AWSStorageConfig.S3Config.SecretAccessKey) + + //should remain empty + assert.EqualValues(t, defaults.StorageConfig.GCSConfig, config.StorageConfig.GCSConfig) + + //should be set by common config + assert.EqualValues(t, "foobar", config.Ruler.StoreConfig.GCS.BucketName) + assert.EqualValues(t, 27, config.Ruler.StoreConfig.GCS.ChunkBufferSize) + assert.EqualValues(t, 5*time.Minute, config.Ruler.StoreConfig.GCS.RequestTimeout) + + //should remain empty + assert.EqualValues(t, defaults.Ruler.StoreConfig.S3, config.Ruler.StoreConfig.S3) + }) + + t.Run("when common object store config is provided, compactor shared store is defaulted to use it", func(t *testing.T) { + for _, tt := range []struct { + configString string + expected string + }{ + { + configString: `common: + storage: + s3: + s3: s3://foo-bucket/example + access_key_id: abc123 + secret_access_key: def789`, + expected: storage.StorageTypeS3, + }, + { + configString: `common: + storage: + gcs: + bucket_name: foobar`, + expected: storage.StorageTypeGCS, + }, + { + configString: `common: + storage: + azure: + account_name: 3rd_planet + account_key: water`, + expected: storage.StorageTypeAzure, + }, + { + configString: `common: + storage: + swift: + username: steve + password: supersecret`, + expected: storage.StorageTypeSwift, + }, + { + configString: `common: + storage: + filesystem: + directory: /tmp/foo`, + expected: storage.StorageTypeFileSystem, + }, + } { + config, _ := testContext(tt.configString, nil) + + assert.Equal(t, tt.expected, config.CompactorConfig.SharedStoreType) + } + }) + + t.Run("explicit compactor shared_store config is preserved", func(t *testing.T) { + configString := `common: + storage: + s3: + s3: s3://foo-bucket/example + access_key_id: abc123 + secret_access_key: def789 +compactor: + shared_store: gcs` + config, _ := testContext(configString, nil) + + assert.Equal(t, "gcs", config.CompactorConfig.SharedStoreType) + }) + }) } // Can't use a totally empty yaml file or it causes weird behavior in the unmarhsalling diff --git a/pkg/storage/chunk/aws/s3_storage_client.go b/pkg/storage/chunk/aws/s3_storage_client.go index d65e4829f7268..38703825dea1c 100644 --- a/pkg/storage/chunk/aws/s3_storage_client.go +++ b/pkg/storage/chunk/aws/s3_storage_client.go @@ -28,6 +28,7 @@ import ( awscommon "github.com/weaveworks/common/aws" "github.com/weaveworks/common/instrument" + cortex_aws "github.com/cortexproject/cortex/pkg/chunk/aws" cortex_s3 "github.com/cortexproject/cortex/pkg/storage/bucket/s3" "github.com/cortexproject/cortex/pkg/util" "github.com/grafana/dskit/flagext" @@ -125,6 +126,32 @@ func (cfg *S3Config) Validate() error { return nil } +func (cfg *S3Config) ToCortexS3Config() cortex_aws.S3Config { + return cortex_aws.S3Config{ + S3: cfg.S3, + S3ForcePathStyle: cfg.S3ForcePathStyle, + BucketNames: cfg.BucketNames, + Endpoint: cfg.Endpoint, + Region: cfg.Region, + AccessKeyID: cfg.AccessKeyID, + SecretAccessKey: cfg.SecretAccessKey, + Insecure: cfg.Insecure, + SSEEncryption: cfg.SSEEncryption, + HTTPConfig: cfg.HTTPConfig.ToCortexHTTPConfig(), + SignatureVersion: cfg.SignatureVersion, + SSEConfig: cfg.SSEConfig, + Inject: cortex_aws.InjectRequestMiddleware(cfg.Inject), + } +} + +func (cfg *HTTPConfig) ToCortexHTTPConfig() cortex_aws.HTTPConfig { + return cortex_aws.HTTPConfig{ + IdleConnTimeout: cfg.IdleConnTimeout, + ResponseHeaderTimeout: cfg.ResponseHeaderTimeout, + InsecureSkipVerify: cfg.InsecureSkipVerify, + } +} + type S3ObjectClient struct { bucketNames []string S3 s3iface.S3API diff --git a/pkg/storage/chunk/azure/blob_storage_client.go b/pkg/storage/chunk/azure/blob_storage_client.go index 8a0cca55df6d8..af4b4e50ab282 100644 --- a/pkg/storage/chunk/azure/blob_storage_client.go +++ b/pkg/storage/chunk/azure/blob_storage_client.go @@ -13,6 +13,7 @@ import ( "github.com/Azure/azure-pipeline-go/pipeline" "github.com/Azure/azure-storage-blob-go/azblob" + cortex_azure "github.com/cortexproject/cortex/pkg/chunk/azure" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/log" "github.com/grafana/dskit/flagext" @@ -87,6 +88,22 @@ func (c *BlobStorageConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagS f.DurationVar(&c.MaxRetryDelay, prefix+"azure.max-retry-delay", 500*time.Millisecond, "Maximum time to wait before retrying a request.") } +func (c *BlobStorageConfig) ToCortexAzureConfig() cortex_azure.BlobStorageConfig { + return cortex_azure.BlobStorageConfig{ + Environment: c.Environment, + ContainerName: c.ContainerName, + AccountName: c.AccountName, + AccountKey: c.AccountKey, + DownloadBufferSize: c.DownloadBufferSize, + UploadBufferSize: c.UploadBufferSize, + UploadBufferCount: c.UploadBufferCount, + RequestTimeout: c.RequestTimeout, + MaxRetries: c.MaxRetries, + MinRetryDelay: c.MinRetryDelay, + MaxRetryDelay: c.MaxRetryDelay, + } +} + // BlobStorage is used to interact with azure blob storage for setting or getting time series chunks. // Implements ObjectStorage type BlobStorage struct { diff --git a/pkg/storage/chunk/gcp/gcs_object_client.go b/pkg/storage/chunk/gcp/gcs_object_client.go index b73c22bd50b49..9878cbe1b9e5e 100644 --- a/pkg/storage/chunk/gcp/gcs_object_client.go +++ b/pkg/storage/chunk/gcp/gcs_object_client.go @@ -7,6 +7,7 @@ import ( "time" "cloud.google.com/go/storage" + cortex_gcp "github.com/cortexproject/cortex/pkg/chunk/gcp" "github.com/pkg/errors" "google.golang.org/api/iterator" "google.golang.org/api/option" @@ -42,6 +43,15 @@ func (cfg *GCSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.BoolVar(&cfg.EnableOpenCensus, prefix+"gcs.enable-opencensus", true, "Enabled OpenCensus (OC) instrumentation for all requests.") } +func (cfg *GCSConfig) ToCortexGCSConfig() cortex_gcp.GCSConfig { + return cortex_gcp.GCSConfig{ + BucketName: cfg.BucketName, + ChunkBufferSize: cfg.ChunkBufferSize, + RequestTimeout: cfg.RequestTimeout, + EnableOpenCensus: cfg.EnableOpenCensus, + } +} + // NewGCSObjectClient makes a new chunk.Client that writes chunks to GCS. func NewGCSObjectClient(ctx context.Context, cfg GCSConfig) (*GCSObjectClient, error) { var opts []option.ClientOption diff --git a/pkg/storage/chunk/local/fs_object_client.go b/pkg/storage/chunk/local/fs_object_client.go index b59607e49a83c..e671b6ff480fc 100644 --- a/pkg/storage/chunk/local/fs_object_client.go +++ b/pkg/storage/chunk/local/fs_object_client.go @@ -13,6 +13,7 @@ import ( "github.com/pkg/errors" "github.com/thanos-io/thanos/pkg/runutil" + cortex_local "github.com/cortexproject/cortex/pkg/ruler/rulestore/local" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/grafana/loki/pkg/storage/chunk" @@ -34,6 +35,12 @@ func (cfg *FSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.Directory, prefix+"local.chunk-directory", "", "Directory to store chunks in.") } +func (cfg *FSConfig) ToCortexLocalConfig() cortex_local.Config { + return cortex_local.Config{ + Directory: cfg.Directory, + } +} + // FSObjectClient holds config for filesystem as object store type FSObjectClient struct { cfg FSConfig diff --git a/pkg/storage/chunk/openstack/swift_object_client.go b/pkg/storage/chunk/openstack/swift_object_client.go index e45f58a6c7cd3..e9a71d5e57ae6 100644 --- a/pkg/storage/chunk/openstack/swift_object_client.go +++ b/pkg/storage/chunk/openstack/swift_object_client.go @@ -11,6 +11,7 @@ import ( "github.com/ncw/swift" "github.com/pkg/errors" + cortex_openstack "github.com/cortexproject/cortex/pkg/chunk/openstack" cortex_swift "github.com/cortexproject/cortex/pkg/storage/bucket/swift" "github.com/cortexproject/cortex/pkg/util/log" @@ -42,6 +43,12 @@ func (cfg *SwiftConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) cfg.Config.RegisterFlagsWithPrefix(prefix, f) } +func (cfg *SwiftConfig) ToCortexSwiftConfig() cortex_openstack.SwiftConfig { + return cortex_openstack.SwiftConfig{ + Config: cfg.Config, + } +} + // NewSwiftObjectClient makes a new chunk.Client that writes chunks to OpenStack Swift. func NewSwiftObjectClient(cfg SwiftConfig) (*SwiftObjectClient, error) { log.WarnExperimentalUse("OpenStack Swift Storage")