From b095fdcfd396415661f4588fca8c13901452e144 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 16 Nov 2018 18:16:44 +0000 Subject: [PATCH] Add test to trigger panic. Also, plumb through Stop on StorageClient, so the cachingStorageClient can stop its cache. Signed-off-by: Tom Wilkie --- aws/dynamodb_table_client.go | 3 +++ aws/storage_client.go | 3 +++ aws/storage_client_s3.go | 3 +++ cassandra/storage_client.go | 2 +- chunk_store.go | 1 + gcp/storage_client.go | 4 +++ inmemory_storage_client.go | 4 +++ storage/caching_storage_client.go | 4 +++ storage/factory_test.go | 41 +++++++++++++++++++++++++++++++ storage_client.go | 2 ++ 10 files changed, 66 insertions(+), 1 deletion(-) create mode 100644 storage/factory_test.go diff --git a/aws/dynamodb_table_client.go b/aws/dynamodb_table_client.go index ea8ea73fa4450..549ab5c6c3dd5 100644 --- a/aws/dynamodb_table_client.go +++ b/aws/dynamodb_table_client.go @@ -69,6 +69,9 @@ func NewDynamoDBTableClient(cfg DynamoDBConfig) (chunk.TableClient, error) { }, nil } +func (d *dynamoTableClient) Stop() { +} + func (d dynamoTableClient) backoffAndRetry(ctx context.Context, fn func(context.Context) error) error { return d.callManager.backoffAndRetry(ctx, fn) } diff --git a/aws/storage_client.go b/aws/storage_client.go index 46ed06e6ec2b1..75320c2a67356 100644 --- a/aws/storage_client.go +++ b/aws/storage_client.go @@ -165,6 +165,9 @@ func NewStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig) (chunk.S return client, nil } +func (a storageClient) Stop() { +} + func (a storageClient) NewWriteBatch() chunk.WriteBatch { return dynamoDBWriteBatch(map[string][]*dynamodb.WriteRequest{}) } diff --git a/aws/storage_client_s3.go b/aws/storage_client_s3.go index 8b5c65c9e141d..8361bc212f766 100644 --- a/aws/storage_client_s3.go +++ b/aws/storage_client_s3.go @@ -73,6 +73,9 @@ func NewS3StorageClient(cfg StorageConfig, schemaCfg chunk.SchemaConfig) (chunk. return client, nil } +func (a s3storageClient) Stop() { +} + func (a s3storageClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) { sp, ctx := ot.StartSpanFromContext(ctx, "GetChunks.S3") defer sp.Finish() diff --git a/cassandra/storage_client.go b/cassandra/storage_client.go index 42a862c592894..060b260b584a3 100644 --- a/cassandra/storage_client.go +++ b/cassandra/storage_client.go @@ -138,7 +138,7 @@ func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageCl }, nil } -func (s *storageClient) Close() { +func (s *storageClient) Stop() { s.session.Close() } diff --git a/chunk_store.go b/chunk_store.go index 286d26d73513f..3290bfcf41dd3 100644 --- a/chunk_store.go +++ b/chunk_store.go @@ -102,6 +102,7 @@ func newStore(cfg StoreConfig, schema Schema, storage StorageClient, limits *val // Stop any background goroutines (ie in the cache.) func (c *store) Stop() { + c.storage.Stop() c.Fetcher.Stop() } diff --git a/gcp/storage_client.go b/gcp/storage_client.go index 288c1329311bf..1d1375101010d 100644 --- a/gcp/storage_client.go +++ b/gcp/storage_client.go @@ -98,6 +98,10 @@ func newStorageClientColumnKey(cfg Config, client *bigtable.Client, schemaCfg ch } } +func (s *storageClientColumnKey) Stop() { + s.client.Close() +} + func (s *storageClientColumnKey) NewWriteBatch() chunk.WriteBatch { return bigtableWriteBatch{ tables: map[string]map[string]*bigtable.Mutation{}, diff --git a/inmemory_storage_client.go b/inmemory_storage_client.go index 1a387992a4d81..000c6c072fa48 100644 --- a/inmemory_storage_client.go +++ b/inmemory_storage_client.go @@ -38,6 +38,10 @@ func NewMockStorage() *MockStorage { } } +// Stop doesn't do anything. +func (*MockStorage) Stop() { +} + // ListTables implements StorageClient. func (m *MockStorage) ListTables(_ context.Context) ([]string, error) { m.mtx.RLock() diff --git a/storage/caching_storage_client.go b/storage/caching_storage_client.go index 1240d12762ebf..cc09945b81ff0 100644 --- a/storage/caching_storage_client.go +++ b/storage/caching_storage_client.go @@ -56,6 +56,10 @@ func newCachingStorageClient(client chunk.StorageClient, c cache.Cache, validity } } +func (s *cachingStorageClient) Stop() { + s.cache.Stop() +} + func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { // We cache the entire row, so filter client side. callback = chunk_util.QueryFilter(callback) diff --git a/storage/factory_test.go b/storage/factory_test.go new file mode 100644 index 0000000000000..b09d90ce9cf6d --- /dev/null +++ b/storage/factory_test.go @@ -0,0 +1,41 @@ +package storage + +import ( + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +func TestFactoryStop(t *testing.T) { + var ( + cfg Config + storeConfig chunk.StoreConfig + schemaConfig chunk.SchemaConfig + defaults validation.Limits + ) + util.DefaultValues(&cfg, &storeConfig, &schemaConfig, &defaults) + schemaConfig.Configs = []chunk.PeriodConfig{ + { + From: model.Time(0), + Store: "inmemory", + }, + { + From: model.Time(1), + Store: "inmemory", + }, + } + cfg.memcacheClient.Host = "localhost" // Fake address that should at least resolve. + + limits, err := validation.NewOverrides(defaults) + require.NoError(t, err) + + store, err := NewStore(cfg, storeConfig, schemaConfig, limits) + require.NoError(t, err) + + store.Stop() +} diff --git a/storage_client.go b/storage_client.go index ecf83f7d62262..167924851e134 100644 --- a/storage_client.go +++ b/storage_client.go @@ -4,6 +4,8 @@ import "context" // StorageClient is a client for the persistent storage for Cortex. (e.g. DynamoDB + S3). type StorageClient interface { + Stop() + // For the write path. NewWriteBatch() WriteBatch BatchWrite(context.Context, WriteBatch) error