Skip to content

Commit

Permalink
Add test to trigger panic.
Browse files Browse the repository at this point in the history
Also, plumb through Stop on StorageClient, so the cachingStorageClient can stop its cache.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
  • Loading branch information
tomwilkie committed Nov 19, 2018
1 parent 6140e26 commit b095fdc
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 1 deletion.
3 changes: 3 additions & 0 deletions aws/dynamodb_table_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func NewDynamoDBTableClient(cfg DynamoDBConfig) (chunk.TableClient, error) {
}, nil
}

func (d *dynamoTableClient) Stop() {
}

func (d dynamoTableClient) backoffAndRetry(ctx context.Context, fn func(context.Context) error) error {
return d.callManager.backoffAndRetry(ctx, fn)
}
Expand Down
3 changes: 3 additions & 0 deletions aws/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ func NewStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig) (chunk.S
return client, nil
}

func (a storageClient) Stop() {
}

func (a storageClient) NewWriteBatch() chunk.WriteBatch {
return dynamoDBWriteBatch(map[string][]*dynamodb.WriteRequest{})
}
Expand Down
3 changes: 3 additions & 0 deletions aws/storage_client_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func NewS3StorageClient(cfg StorageConfig, schemaCfg chunk.SchemaConfig) (chunk.
return client, nil
}

func (a s3storageClient) Stop() {
}

func (a s3storageClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
sp, ctx := ot.StartSpanFromContext(ctx, "GetChunks.S3")
defer sp.Finish()
Expand Down
2 changes: 1 addition & 1 deletion cassandra/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageCl
}, nil
}

func (s *storageClient) Close() {
func (s *storageClient) Stop() {
s.session.Close()
}

Expand Down
1 change: 1 addition & 0 deletions chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func newStore(cfg StoreConfig, schema Schema, storage StorageClient, limits *val

// Stop any background goroutines (ie in the cache.)
func (c *store) Stop() {
c.storage.Stop()
c.Fetcher.Stop()
}

Expand Down
4 changes: 4 additions & 0 deletions gcp/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func newStorageClientColumnKey(cfg Config, client *bigtable.Client, schemaCfg ch
}
}

func (s *storageClientColumnKey) Stop() {
s.client.Close()
}

func (s *storageClientColumnKey) NewWriteBatch() chunk.WriteBatch {
return bigtableWriteBatch{
tables: map[string]map[string]*bigtable.Mutation{},
Expand Down
4 changes: 4 additions & 0 deletions inmemory_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ func NewMockStorage() *MockStorage {
}
}

// Stop doesn't do anything.
func (*MockStorage) Stop() {
}

// ListTables implements StorageClient.
func (m *MockStorage) ListTables(_ context.Context) ([]string, error) {
m.mtx.RLock()
Expand Down
4 changes: 4 additions & 0 deletions storage/caching_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func newCachingStorageClient(client chunk.StorageClient, c cache.Cache, validity
}
}

func (s *cachingStorageClient) Stop() {
s.cache.Stop()
}

func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error {
// We cache the entire row, so filter client side.
callback = chunk_util.QueryFilter(callback)
Expand Down
41 changes: 41 additions & 0 deletions storage/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package storage

import (
"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
)

func TestFactoryStop(t *testing.T) {
var (
cfg Config
storeConfig chunk.StoreConfig
schemaConfig chunk.SchemaConfig
defaults validation.Limits
)
util.DefaultValues(&cfg, &storeConfig, &schemaConfig, &defaults)
schemaConfig.Configs = []chunk.PeriodConfig{
{
From: model.Time(0),
Store: "inmemory",
},
{
From: model.Time(1),
Store: "inmemory",
},
}
cfg.memcacheClient.Host = "localhost" // Fake address that should at least resolve.

limits, err := validation.NewOverrides(defaults)
require.NoError(t, err)

store, err := NewStore(cfg, storeConfig, schemaConfig, limits)
require.NoError(t, err)

store.Stop()
}
2 changes: 2 additions & 0 deletions storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import "context"

// StorageClient is a client for the persistent storage for Cortex. (e.g. DynamoDB + S3).
type StorageClient interface {
Stop()

// For the write path.
NewWriteBatch() WriteBatch
BatchWrite(context.Context, WriteBatch) error
Expand Down

0 comments on commit b095fdc

Please sign in to comment.