diff --git a/Gopkg.lock b/Gopkg.lock index 9e51ec61956a5..c31275b068b2f 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -187,7 +187,7 @@ [[projects]] branch = "lazy-load-chunks" - digest = "1:bf1fa66c54722bc8664f1465e427cd6fe7df52f2b6fd5ab996baf37601687b70" + digest = "1:a999c29b3a215dfc12d374a9aac09c94c1b72ef530f4e39d9ab3ae1468cfe8e8" name = "github.com/cortexproject/cortex" packages = [ "pkg/chunk", @@ -213,7 +213,7 @@ "pkg/util/validation", ] pruneopts = "UT" - revision = "95a3f308e95617732b76e337874e83ccf173cf14" + revision = "61b92520b0c1afdef6e42b7a27cca6c715e9f386" source = "https://github.com/grafana/cortex" [[projects]] diff --git a/production/ksonnet/loki/config.libsonnet b/production/ksonnet/loki/config.libsonnet index 71552b54e8d5b..7ba40a7aaa14f 100644 --- a/production/ksonnet/loki/config.libsonnet +++ b/production/ksonnet/loki/config.libsonnet @@ -67,6 +67,44 @@ gcs: { bucket_name: $._config.gcs_bucket_name, }, + + index_queries_cache_config: { + memcached: { + batch_size: 100, + parallelism: 100, + }, + + memcached_client: { + host: 'memcached-index-queries.%s.svc.cluster.local' % $._config.namespace, + service: 'memcached-client', + }, + }, + }, + + chunk_store_config: { + chunk_cache_config: { + memcached: { + batch_size: 100, + parallelism: 100, + }, + + memcached_client: { + host: 'memcached.%s.svc.cluster.local' % $._config.namespace, + service: 'memcached-client', + }, + }, + + write_dedupe_cache_config: { + memcached: { + batch_size: 100, + parallelism: 100, + }, + + memcached_client: { + host: 'memcached-index-writes.%s.svc.cluster.local' % $._config.namespace, + service: 'memcached-client', + }, + }, }, schema_config: { diff --git a/production/ksonnet/loki/jsonnetfile.json b/production/ksonnet/loki/jsonnetfile.json index 49ec3785ab436..7f40dd6152e7d 100644 --- a/production/ksonnet/loki/jsonnetfile.json +++ b/production/ksonnet/loki/jsonnetfile.json @@ -19,6 +19,16 @@ } }, "version": "master" + }, + { + "name": "memcached", + "source": { + "git": { + "remote": "https://github.com/grafana/jsonnet-libs", + "subdir": "memcached" + } + }, + "version": "master" } ] } diff --git a/production/ksonnet/loki/loki.libsonnet b/production/ksonnet/loki/loki.libsonnet index 59ee5bf2b20b9..beebd9f780032 100644 --- a/production/ksonnet/loki/loki.libsonnet +++ b/production/ksonnet/loki/loki.libsonnet @@ -4,8 +4,11 @@ (import 'config.libsonnet') + (import 'consul/consul.libsonnet') + -// Cortex services +// Loki services (import 'distributor.libsonnet') + (import 'ingester.libsonnet') + (import 'querier.libsonnet') + -(import 'table-manager.libsonnet') +(import 'table-manager.libsonnet') + + +// Supporting services +(import 'memcached.libsonnet') diff --git a/production/ksonnet/loki/memcached.libsonnet b/production/ksonnet/loki/memcached.libsonnet new file mode 100644 index 0000000000000..8634a398900d2 --- /dev/null +++ b/production/ksonnet/loki/memcached.libsonnet @@ -0,0 +1,21 @@ +local memcached = 'memcached/memcached.libsonnet'; + +memcached { + // Memcached instance used to cache chunks. + memcached_chunks: $.memcached { + name: 'memcached', + max_item_size: '2m', + memory_limit_mb: 4096, + }, + + // Dedicated memcached instance used to temporarily cache index lookups. + memcached_index_queries: $.memcached { + name: 'memcached-index-queries', + max_item_size: '5m', + }, + + // Dedicated memcached instance used to dedupe writes to the index. + memcached_index_writes: $.memcached { + name: 'memcached-index-writes', + }, +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go index a1dff2849329f..3356eaa9ccbcf 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go @@ -10,7 +10,6 @@ import ( "github.com/go-kit/kit/log/level" ot "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -25,6 +24,7 @@ import ( chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/spanlogger" awscommon "github.com/weaveworks/common/aws" "github.com/weaveworks/common/instrument" "github.com/weaveworks/common/user" @@ -79,12 +79,6 @@ var ( // metric names. Buckets: prometheus.ExponentialBuckets(1, 4, 6), }) - dynamoQueryRetryCount = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "dynamo_query_retry_count", - Help: "Number of retries per DynamoDB operation.", - Buckets: prometheus.LinearBuckets(0, 1, 21), - }, []string{"operation"}) ) func init() { @@ -92,7 +86,6 @@ func init() { prometheus.MustRegister(dynamoConsumedCapacity) prometheus.MustRegister(dynamoFailures) prometheus.MustRegister(dynamoQueryPagesCount) - prometheus.MustRegister(dynamoQueryRetryCount) prometheus.MustRegister(dynamoDroppedRequests) } @@ -212,9 +205,6 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write unprocessed := dynamoDBWriteBatch{} backoff := util.NewBackoff(ctx, a.cfg.backoffConfig) - defer func() { - dynamoQueryRetryCount.WithLabelValues("BatchWrite").Observe(float64(backoff.NumRetries())) - }() for outstanding.Len()+unprocessed.Len() > 0 && backoff.Ongoing() { requests := dynamoDBWriteBatch{} @@ -354,9 +344,6 @@ func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery func (a dynamoDBStorageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest, hashValue string, pageCount int) (*dynamoDBReadResponse, error) { backoff := util.NewBackoff(ctx, a.cfg.backoffConfig) - defer func() { - dynamoQueryRetryCount.WithLabelValues("queryPage").Observe(float64(backoff.NumRetries())) - }() var err error for backoff.Ongoing() { @@ -464,9 +451,9 @@ type chunksPlusError struct { // GetChunks implements chunk.ObjectClient. func (a dynamoDBStorageClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) { - sp, ctx := ot.StartSpanFromContext(ctx, "GetChunks.DynamoDB") - defer sp.Finish() - sp.LogFields(otlog.Int("chunks requested", len(chunks))) + log, ctx := spanlogger.New(ctx, "GetChunks.DynamoDB", ot.Tag{Key: "numChunks", Value: len(chunks)}) + defer log.Span.Finish() + level.Debug(log).Log("chunks requested", len(chunks)) dynamoDBChunks := chunks var err error @@ -499,13 +486,10 @@ func (a dynamoDBStorageClient) GetChunks(ctx context.Context, chunks []chunk.Chu } finalChunks = append(finalChunks, in.chunks...) } - sp.LogFields(otlog.Int("chunks fetched", len(finalChunks))) - if err != nil { - sp.LogFields(otlog.String("error", err.Error())) - } + level.Debug(log).Log("chunks fetched", len(finalChunks)) // Return any chunks we did receive: a partial result may be useful - return finalChunks, err + return finalChunks, log.Error(err) } // As we're re-using the DynamoDB schema from the index for the chunk tables, @@ -516,8 +500,8 @@ var placeholder = []byte{'c'} // Structure is identical to BatchWrite(), but operating on different datatypes // so cannot share implementation. If you fix a bug here fix it there too. func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) { - sp, ctx := ot.StartSpanFromContext(ctx, "getDynamoDBChunks", ot.Tag{Key: "numChunks", Value: len(chunks)}) - defer sp.Finish() + log, ctx := spanlogger.New(ctx, "getDynamoDBChunks", ot.Tag{Key: "numChunks", Value: len(chunks)}) + defer log.Span.Finish() outstanding := dynamoDBReadRequest{} chunksByKey := map[string]chunk.Chunk{} for _, chunk := range chunks { @@ -525,7 +509,7 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c chunksByKey[key] = chunk tableName, err := a.schemaCfg.ChunkTableFor(chunk.From) if err != nil { - return nil, err + return nil, log.Error(err) } outstanding.Add(tableName, key, placeholder) } @@ -533,9 +517,6 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c result := []chunk.Chunk{} unprocessed := dynamoDBReadRequest{} backoff := util.NewBackoff(ctx, a.cfg.backoffConfig) - defer func() { - dynamoQueryRetryCount.WithLabelValues("getDynamoDBChunks").Observe(float64(backoff.NumRetries())) - }() for outstanding.Len()+unprocessed.Len() > 0 && backoff.Ongoing() { requests := dynamoDBReadRequest{} @@ -570,9 +551,8 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c continue } else if ok && awsErr.Code() == validationException { // this read will never work, so the only option is to drop the offending request and continue. - level.Warn(util.Logger).Log("msg", "Error while fetching data from Dynamo", "err", awsErr) - level.Debug(util.Logger).Log("msg", "Dropped request details", "requests", requests) - util.Event().Log("msg", "ValidationException", "requests", requests) + level.Warn(log).Log("msg", "Error while fetching data from Dynamo", "err", awsErr) + level.Debug(log).Log("msg", "Dropped request details", "requests", requests) // recording the drop counter separately from recordDynamoError(), as the error code alone may not provide enough context // to determine if a request was dropped (or not) for tableName := range requests { @@ -587,7 +567,7 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c processedChunks, err := processChunkResponse(response, chunksByKey) if err != nil { - return nil, err + return nil, log.Error(err) } result = append(result, processedChunks...) @@ -601,7 +581,7 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c if valuesLeft := outstanding.Len() + unprocessed.Len(); valuesLeft > 0 { // Return the chunks we did fetch, because partial results may be useful - return result, fmt.Errorf("failed to query chunks, %d values remaining: %s", valuesLeft, backoff.Err()) + return result, log.Error(fmt.Errorf("failed to query chunks, %d values remaining: %s", valuesLeft, backoff.Err())) } return result, nil } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/background.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/background.go index 6b552047c03aa..ea589630725a1 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/background.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/background.go @@ -26,8 +26,8 @@ var ( // BackgroundConfig is config for a Background Cache. type BackgroundConfig struct { - WriteBackGoroutines int - WriteBackBuffer int + WriteBackGoroutines int `yaml:"writeback_goroutines,omitempty"` + WriteBackBuffer int `yaml:"writeback_buffer,omitempty"` } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/cache.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/cache.go index 62ef40958da93..8dbe001be03c9 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/cache.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/cache.go @@ -15,19 +15,19 @@ type Cache interface { // Config for building Caches. type Config struct { - EnableDiskcache bool - EnableFifoCache bool + EnableDiskcache bool `yaml:"enable_diskcache,omitempty"` + EnableFifoCache bool `yaml:"enable_fifocache,omitempty"` - DefaultValidity time.Duration + DefaultValidity time.Duration `yaml:"defaul_validity,omitempty"` - background BackgroundConfig - memcache MemcachedConfig - memcacheClient MemcachedClientConfig - diskcache DiskcacheConfig - fifocache FifoCacheConfig + Background BackgroundConfig `yaml:"background,omitempty"` + Memcache MemcachedConfig `yaml:"memcached,omitempty"` + MemcacheClient MemcachedClientConfig `yaml:"memcached_client,omitempty"` + Diskcache DiskcacheConfig `yaml:"diskcache,omitempty"` + Fifocache FifoCacheConfig `yaml:"fifocache,omitempty"` // This is to name the cache metrics properly. - prefix string + Prefix string `yaml:"prefix,omitempty"` // For tests to inject specific implementations. Cache Cache @@ -35,17 +35,17 @@ type Config struct { // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f *flag.FlagSet) { - cfg.background.RegisterFlagsWithPrefix(prefix, description, f) - cfg.memcache.RegisterFlagsWithPrefix(prefix, description, f) - cfg.memcacheClient.RegisterFlagsWithPrefix(prefix, description, f) - cfg.diskcache.RegisterFlagsWithPrefix(prefix, description, f) - cfg.fifocache.RegisterFlagsWithPrefix(prefix, description, f) + cfg.Background.RegisterFlagsWithPrefix(prefix, description, f) + cfg.Memcache.RegisterFlagsWithPrefix(prefix, description, f) + cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f) + cfg.Diskcache.RegisterFlagsWithPrefix(prefix, description, f) + cfg.Fifocache.RegisterFlagsWithPrefix(prefix, description, f) f.BoolVar(&cfg.EnableDiskcache, prefix+"cache.enable-diskcache", false, description+"Enable on-disk cache.") f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"Enable in-memory cache.") f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", 0, description+"The default validity of entries for caches unless overridden.") - cfg.prefix = prefix + cfg.Prefix = prefix } // New creates a new Cache using Config. @@ -57,39 +57,39 @@ func New(cfg Config) (Cache, error) { caches := []Cache{} if cfg.EnableFifoCache { - if cfg.fifocache.Validity == 0 && cfg.DefaultValidity != 0 { - cfg.fifocache.Validity = cfg.DefaultValidity + if cfg.Fifocache.Validity == 0 && cfg.DefaultValidity != 0 { + cfg.Fifocache.Validity = cfg.DefaultValidity } - cache := NewFifoCache(cfg.prefix+"fifocache", cfg.fifocache) - caches = append(caches, Instrument(cfg.prefix+"fifocache", cache)) + cache := NewFifoCache(cfg.Prefix+"fifocache", cfg.Fifocache) + caches = append(caches, Instrument(cfg.Prefix+"fifocache", cache)) } if cfg.EnableDiskcache { - cache, err := NewDiskcache(cfg.diskcache) + cache, err := NewDiskcache(cfg.Diskcache) if err != nil { return nil, err } - cacheName := cfg.prefix + "diskcache" - caches = append(caches, NewBackground(cacheName, cfg.background, Instrument(cacheName, cache))) + cacheName := cfg.Prefix + "diskcache" + caches = append(caches, NewBackground(cacheName, cfg.Background, Instrument(cacheName, cache))) } - if cfg.memcacheClient.Host != "" { - if cfg.memcache.Expiration == 0 && cfg.DefaultValidity != 0 { - cfg.memcache.Expiration = cfg.DefaultValidity + if cfg.MemcacheClient.Host != "" { + if cfg.Memcache.Expiration == 0 && cfg.DefaultValidity != 0 { + cfg.Memcache.Expiration = cfg.DefaultValidity } - client := NewMemcachedClient(cfg.memcacheClient) - cache := NewMemcached(cfg.memcache, client, cfg.prefix) + client := NewMemcachedClient(cfg.MemcacheClient) + cache := NewMemcached(cfg.Memcache, client, cfg.Prefix) - cacheName := cfg.prefix + "memcache" - caches = append(caches, NewBackground(cacheName, cfg.background, Instrument(cacheName, cache))) + cacheName := cfg.Prefix + "memcache" + caches = append(caches, NewBackground(cacheName, cfg.Background, Instrument(cacheName, cache))) } cache := NewTiered(caches) if len(caches) > 1 { - cache = Instrument(cfg.prefix+"tiered", cache) + cache = Instrument(cfg.Prefix+"tiered", cache) } return cache, nil } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/diskcache.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/diskcache.go index 6463827f73d6e..28e3ac4f72d20 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/diskcache.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/diskcache.go @@ -54,8 +54,8 @@ const ( // DiskcacheConfig for the Disk cache. type DiskcacheConfig struct { - Path string - Size int + Path string `yaml:"path,omitempty"` + Size int `yaml:"size,omitempty"` } // RegisterFlags adds the flags required to config this to the given FlagSet diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/fifo_cache.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/fifo_cache.go index 59c7d35b71f10..bd7b2be44a545 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/fifo_cache.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/fifo_cache.go @@ -56,8 +56,8 @@ var ( // FifoCacheConfig holds config for the FifoCache. type FifoCacheConfig struct { - Size int - Validity time.Duration + Size int `yaml:"size,omitempty"` + Validity time.Duration `yaml:"validity,omitempty"` } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/memcached.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/memcached.go index 37836c77df018..dce4652018784 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/memcached.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/memcached.go @@ -40,10 +40,10 @@ func (o observableVecCollector) After(method, statusCode string, start time.Time // MemcachedConfig is config to make a Memcached type MemcachedConfig struct { - Expiration time.Duration + Expiration time.Duration `yaml:"expiration,omitempty"` - BatchSize int - Parallelism int + BatchSize int `yaml:"batch_size,omitempty"` + Parallelism int `yaml:"parallelism,omitempty"` } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/memcached_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/memcached_client.go index d43fa93713f58..43e40cbbbaaa7 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/memcached_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/memcached_client.go @@ -33,11 +33,11 @@ type memcachedClient struct { // MemcachedClientConfig defines how a MemcachedClient should be constructed. type MemcachedClientConfig struct { - Host string - Service string - Timeout time.Duration - MaxIdleConns int - UpdateInterval time.Duration + Host string `yaml:"host,omitempty"` + Service string `yaml:"service,omitempty"` + Timeout time.Duration `yaml:"timeout,omitempty"` + MaxIdleConns int `yaml:"max_idle_conns,omitempty"` + UpdateInterval time.Duration `yaml:"update_interval,omitempty"` } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk.go index 1ce3b1b9ae3e0..009fd8eb3f9e2 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk.go @@ -269,7 +269,7 @@ func (c *Chunk) Decode(decodeContext *DecodeContext, input []byte) error { return err } c.encoded = input - return c.Data.UnmarshalFromBuf(input) + return errors.Wrap(c.Data.UnmarshalFromBuf(input), "when unmarshalling legacy chunk") } // First, calculate the checksum of the chunk and confirm it matches @@ -282,14 +282,14 @@ func (c *Chunk) Decode(decodeContext *DecodeContext, input []byte) error { r := bytes.NewReader(input) var metadataLen uint32 if err := binary.Read(r, binary.BigEndian, &metadataLen); err != nil { - return err + return errors.Wrap(err, "when reading metadata length from chunk") } var tempMetadata Chunk decodeContext.reader.Reset(r) json := jsoniter.ConfigFastest err := json.NewDecoder(decodeContext.reader).Decode(&tempMetadata) if err != nil { - return err + return errors.Wrap(err, "when decoding chunk metadata") } if len(input)-r.Len() != int(metadataLen) { return ErrMetadataLength @@ -315,12 +315,12 @@ func (c *Chunk) Decode(decodeContext *DecodeContext, input []byte) error { // Finally, unmarshal the actual chunk data. c.Data, err = prom_chunk.NewForEncoding(c.Encoding) if err != nil { - return err + return errors.Wrap(err, "when creating new chunk") } var dataLen uint32 if err := binary.Read(r, binary.BigEndian, &dataLen); err != nil { - return err + return errors.Wrap(err, "when reading data length from chunk") } c.encoded = input diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go index b3a82eb960ca6..836c5d4872f99 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go @@ -2,6 +2,7 @@ package chunk import ( "context" + "errors" "flag" "fmt" "net/http" @@ -10,7 +11,6 @@ import ( "time" "github.com/go-kit/kit/log/level" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" @@ -56,11 +56,11 @@ func init() { // StoreConfig specifies config for a ChunkStore type StoreConfig struct { - ChunkCacheConfig cache.Config - WriteDedupeCacheConfig cache.Config + ChunkCacheConfig cache.Config `yaml:"chunk_cache_config,omitempty"` + WriteDedupeCacheConfig cache.Config `yaml:"write_dedupe_cache_config,omitempty"` - MinChunkAge time.Duration - CacheLookupsOlderThan time.Duration + MinChunkAge time.Duration `yaml:"min_chunk_age,omitempty"` + CacheLookupsOlderThan time.Duration `yaml:"cache_lookups_older_than,omitempty"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -72,8 +72,8 @@ func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.CacheLookupsOlderThan, "store.cache-lookups-older-than", 0, "Cache index entries older than this period. 0 to disable.") // Deprecated. - flagext.DeprecatedFlag(f, "store.cardinality-cache-size", "DEPRECATED. Use store.index-cache-size.enable-fifocache and store.cardinality-cache.fifocache.size instead.") - flagext.DeprecatedFlag(f, "store.cardinality-cache-validity", "DEPRECATED. Use store.index-cache-size.enable-fifocache and store.cardinality-cache.fifocache.duration instead.") + flagext.DeprecatedFlag(f, "store.cardinality-cache-size", "DEPRECATED. Use store.index-cache-read.enable-fifocache and store.index-cache-read.fifocache.size instead.") + flagext.DeprecatedFlag(f, "store.cardinality-cache-validity", "DEPRECATED. Use store.index-cache-read.enable-fifocache and store.index-cache-read.fifocache.duration instead.") } // store implements Store @@ -121,21 +121,16 @@ func (c *store) Put(ctx context.Context, chunks []Chunk) error { // PutOne implements ChunkStore func (c *store) PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error { - userID, err := user.ExtractOrgID(ctx) - if err != nil { - return err - } - chunks := []Chunk{chunk} - err = c.storage.PutChunks(ctx, chunks) + err := c.storage.PutChunks(ctx, chunks) if err != nil { return err } c.writeBackCache(ctx, chunks) - writeReqs, err := c.calculateIndexEntries(userID, from, through, chunk) + writeReqs, err := c.calculateIndexEntries(chunk.UserID, from, through, chunk) if err != nil { return err } @@ -193,22 +188,64 @@ func (c *store) GetChunkRefs(ctx context.Context, from, through model.Time, allM return nil, nil, errors.New("not implemented") } -func (c *store) validateQuery(ctx context.Context, from model.Time, through *model.Time, matchers []*labels.Matcher) (string, []*labels.Matcher, bool, error) { - log, ctx := spanlogger.New(ctx, "store.validateQuery") +// LabelValuesForMetricName retrieves all label values for a single label name and metric name. +func (c *store) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName, labelName string) ([]string, error) { + log, ctx := spanlogger.New(ctx, "ChunkStore.LabelValues") + defer log.Span.Finish() + level.Debug(log).Log("from", from, "through", through, "metricName", metricName, "labelName", labelName) + + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + + shortcut, err := c.validateQueryTimeRange(ctx, from, &through) + if err != nil { + return nil, err + } else if shortcut { + return nil, nil + } + + queries, err := c.schema.GetReadQueriesForMetricLabel(from, through, userID, model.LabelValue(metricName), model.LabelName(labelName)) + if err != nil { + return nil, err + } + + entries, err := c.lookupEntriesByQueries(ctx, queries) + if err != nil { + return nil, err + } + + var result []string + for _, entry := range entries { + _, labelValue, _, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) + if err != nil { + return nil, err + } + result = append(result, string(labelValue)) + } + + sort.Strings(result) + result = uniqueStrings(result) + return result, nil +} + +func (c *store) validateQueryTimeRange(ctx context.Context, from model.Time, through *model.Time) (bool, error) { + log, ctx := spanlogger.New(ctx, "store.validateQueryTimeRange") defer log.Span.Finish() if *through < from { - return "", nil, false, httpgrpc.Errorf(http.StatusBadRequest, "invalid query, through < from (%s < %s)", through, from) + return false, httpgrpc.Errorf(http.StatusBadRequest, "invalid query, through < from (%s < %s)", through, from) } userID, err := user.ExtractOrgID(ctx) if err != nil { - return "", nil, false, err + return false, err } maxQueryLength := c.limits.MaxQueryLength(userID) if maxQueryLength > 0 && (*through).Sub(from) > maxQueryLength { - return "", nil, false, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, (*through).Sub(from), maxQueryLength) + return false, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, (*through).Sub(from), maxQueryLength) } now := model.Now() @@ -216,12 +253,12 @@ func (c *store) validateQuery(ctx context.Context, from model.Time, through *mod if from.After(now) { // time-span start is in future ... regard as legal level.Error(log).Log("msg", "whole timerange in future, yield empty resultset", "through", through, "from", from, "now", now) - return "", nil, true, nil + return true, nil } if from.After(now.Add(-c.cfg.MinChunkAge)) { // no data relevant to this query will have arrived at the store yet - return "", nil, true, nil + return true, nil } if through.After(now.Add(5 * time.Minute)) { @@ -230,6 +267,21 @@ func (c *store) validateQuery(ctx context.Context, from model.Time, through *mod *through = now // Avoid processing future part - otherwise some schemas could fail with eg non-existent table gripes } + return false, nil +} + +func (c *store) validateQuery(ctx context.Context, from model.Time, through *model.Time, matchers []*labels.Matcher) (string, []*labels.Matcher, bool, error) { + log, ctx := spanlogger.New(ctx, "store.validateQuery") + defer log.Span.Finish() + + shortcut, err := c.validateQueryTimeRange(ctx, from, through) + if err != nil { + return "", nil, false, err + } + if shortcut { + return "", nil, true, nil + } + // Check there is a metric name matcher of type equal, metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(matchers) if !ok || metricNameMatcher.Type != labels.MatchEqual { @@ -257,7 +309,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, from, through model.Tim level.Debug(log).Log("Chunks in index", len(chunks)) // Filter out chunks that are not in the selected time range. - filtered, keys := filterChunksByTime(from, through, chunks) + filtered := filterChunksByTime(from, through, chunks) level.Debug(log).Log("Chunks post filtering", len(chunks)) maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID) @@ -268,6 +320,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, from, through model.Tim } // Now fetch the actual chunk data from Memcache / S3 + keys := keysFromChunks(filtered) allChunks, err := c.FetchChunks(ctx, filtered, keys) if err != nil { return nil, promql.ErrStorage{Err: err} @@ -307,7 +360,7 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, from, through mode } level.Debug(log).Log("chunkIDs", len(chunkIDs)) - return c.convertChunkIDsToChunks(ctx, chunkIDs) + return c.convertChunkIDsToChunks(ctx, userID, chunkIDs) } // Otherwise get chunks which include other matchers @@ -369,7 +422,7 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, from, through mode level.Debug(log).Log("msg", "post intersection", "chunkIDs", len(chunkIDs)) // Convert IndexEntry's into chunks - return c.convertChunkIDsToChunks(ctx, chunkIDs) + return c.convertChunkIDsToChunks(ctx, userID, chunkIDs) } func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { @@ -416,12 +469,7 @@ func (c *store) parseIndexEntries(ctx context.Context, entries []IndexEntry, mat return result, nil } -func (c *store) convertChunkIDsToChunks(ctx context.Context, chunkIDs []string) ([]Chunk, error) { - userID, err := user.ExtractOrgID(ctx) - if err != nil { - return nil, err - } - +func (c *store) convertChunkIDsToChunks(ctx context.Context, userID string, chunkIDs []string) ([]Chunk, error) { chunkSet := make([]Chunk, 0, len(chunkIDs)) for _, chunkID := range chunkIDs { chunk, err := ParseExternalKey(userID, chunkID) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go index b61a2eaa60545..28f323a427749 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go @@ -16,17 +16,24 @@ import ( const chunkDecodeParallelism = 16 -func filterChunksByTime(from, through model.Time, chunks []Chunk) ([]Chunk, []string) { +func filterChunksByTime(from, through model.Time, chunks []Chunk) []Chunk { filtered := make([]Chunk, 0, len(chunks)) - keys := make([]string, 0, len(chunks)) for _, chunk := range chunks { if chunk.Through < from || through < chunk.From { continue } filtered = append(filtered, chunk) - keys = append(keys, chunk.ExternalKey()) } - return filtered, keys + return filtered +} + +func keysFromChunks(chunks []Chunk) []string { + keys := make([]string, 0, len(chunks)) + for _, chk := range chunks { + keys = append(keys, chk.ExternalKey()) + } + + return keys } func filterChunksByMatchers(chunks []Chunk, filters []*labels.Matcher) []Chunk { diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go index bd3d21612f1b8..762aa424e3625 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go @@ -15,7 +15,10 @@ type Store interface { Put(ctx context.Context, chunks []Chunk) error PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error) + // GetChunkRefs returns the un-loaded chunks and the fetchers to be used to load them. You can load each slice of chunks ([]Chunk), + // using the corresponding Fetcher (fetchers[i].FetchChunks(ctx, chunks[i], ...) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) + LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error) Stop() } @@ -89,6 +92,20 @@ func (c compositeStore) Get(ctx context.Context, from, through model.Time, match return results, err } +// LabelValuesForMetricName retrieves all label values for a single label name and metric name. +func (c compositeStore) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error) { + var result []string + err := c.forStores(from, through, func(from, through model.Time, store Store) error { + labelValues, err := store.LabelValuesForMetricName(ctx, from, through, metricName, labelName) + if err != nil { + return err + } + result = append(result, labelValues...) + return nil + }) + return result, err +} + func (c compositeStore) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) { chunkIDs := [][]Chunk{} fetchers := []*Fetcher{} diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go index 05417d4650556..0dfe256d8af21 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go @@ -46,9 +46,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Instance, "bigtable.instance", "", "Bigtable instance ID.") cfg.GRPCClientConfig.RegisterFlags("bigtable", f) - - // Deprecated. - f.Int("bigtable.max-recv-msg-size", 100<<20, "DEPRECATED. Bigtable grpc max receive message size.") } // storageClientColumnKey implements chunk.storageClient for GCP. diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/instrumentation.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/instrumentation.go index a87cd10b3dffb..99b211cdf0aca 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/instrumentation.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/instrumentation.go @@ -33,9 +33,9 @@ var ( Name: "gcs_request_duration_seconds", Help: "Time spent doing GCS requests.", - // Bigtable latency seems to range from a few ms to a few hundred ms and is - // important. So use 6 buckets from 1ms to 1s. - Buckets: prometheus.ExponentialBuckets(0.001, 4, 6), + // GCS latency seems to range from a few ms to a few secs and is + // important. So use 6 buckets from 5ms to 5s. + Buckets: prometheus.ExponentialBuckets(0.005, 4, 6), }, []string{"operation", "status_code"}) ) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go index ec01b8415d00e..aa522284852c0 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go @@ -195,14 +195,6 @@ func (cfg PeriodConfig) createSchema() Schema { return s } -func (cfg *PeriodConfig) tableForBucket(bucketStart int64) string { - if cfg.IndexTables.Period == 0 { - return cfg.IndexTables.Prefix - } - // TODO remove reference to time package here - return cfg.IndexTables.Prefix + strconv.Itoa(int(bucketStart/int64(cfg.IndexTables.Period/time.Second))) -} - // Load the yaml file, or build the config from legacy command-line flags func (cfg *SchemaConfig) Load() error { if len(cfg.Configs) > 0 { @@ -268,7 +260,7 @@ func (cfg *PeriodConfig) hourlyBuckets(from, through model.Time, userID string) result = append(result, Bucket{ from: uint32(relativeFrom), through: uint32(relativeThrough), - tableName: cfg.tableForBucket(i * secondsInHour), + tableName: cfg.IndexTables.TableFor(model.TimeFromUnix(i * secondsInHour)), hashKey: fmt.Sprintf("%s:%d", userID, i), }) } @@ -303,7 +295,7 @@ func (cfg *PeriodConfig) dailyBuckets(from, through model.Time, userID string) [ result = append(result, Bucket{ from: uint32(relativeFrom), through: uint32(relativeThrough), - tableName: cfg.tableForBucket(i * secondsInDay), + tableName: cfg.IndexTables.TableFor(model.TimeFromUnix(i * secondsInDay)), hashKey: fmt.Sprintf("%s:d%d", userID, i), }) } @@ -368,8 +360,7 @@ func (cfg *PeriodicTableConfig) periodicTables(from, through model.Time, pCfg Pr } for i := firstTable; i <= lastTable; i++ { table := TableDesc{ - // Name construction needs to be consistent with chunk_store.bigBuckets - Name: cfg.Prefix + strconv.Itoa(int(i)), + Name: cfg.tableForPeriod(i), ProvisionedRead: pCfg.InactiveReadThroughput, ProvisionedWrite: pCfg.InactiveWriteThroughput, UseOnDemandIOMode: pCfg.InactiveThroughputOnDemandMode, @@ -443,9 +434,13 @@ func (cfg SchemaConfig) ChunkTableFor(t model.Time) (string, error) { // TableFor calculates the table shard for a given point in time. func (cfg *PeriodicTableConfig) TableFor(t model.Time) string { - var ( - periodSecs = int64(cfg.Period / time.Second) - table = t.Unix() / periodSecs - ) - return cfg.Prefix + strconv.Itoa(int(table)) + if cfg.Period == 0 { // non-periodic + return cfg.Prefix + } + periodSecs := int64(cfg.Period / time.Second) + return cfg.tableForPeriod(t.Unix() / periodSecs) +} + +func (cfg *PeriodicTableConfig) tableForPeriod(i int64) string { + return cfg.Prefix + strconv.Itoa(int(i)) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go index 4a0b742e6af28..a7d618c3854bd 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go @@ -7,7 +7,6 @@ import ( "net/http" "github.com/go-kit/kit/log/level" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" @@ -22,11 +21,19 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) -var ( - // ErrCardinalityExceeded is returned when the user reads a row that - // is too large. - ErrCardinalityExceeded = errors.New("cardinality limit exceeded") +// CardinalityExceededError is returned when the user reads a row that +// is too large. +type CardinalityExceededError struct { + MetricName, LabelName string + Size, Limit int32 +} +func (e CardinalityExceededError) Error() string { + return fmt.Sprintf("cardinality limit exceeded for %s{%s}; %d entries, more than limit of %d", + e.MetricName, e.LabelName, e.Size, e.Limit) +} + +var ( indexLookupsPerQuery = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: "cortex", Name: "chunk_store_index_lookups_per_query", @@ -104,7 +111,7 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc return nil, err } - chks, _, err := c.GetChunkRefs(ctx, from, through, allMatchers...) + chks, fetchers, err := c.GetChunkRefs(ctx, from, through, allMatchers...) if err != nil { return nil, err } @@ -115,11 +122,7 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc } chunks := chks[0] - // Filter out chunks that are not in the selected time range. - filtered, keys := filterChunksByTime(from, through, chunks) - level.Debug(log).Log("chunks-post-filtering", len(chunks)) - chunksPerQuery.Observe(float64(len(filtered))) - + fetcher := fetchers[0] // Protect ourselves against OOMing. maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID) if maxChunksPerQuery > 0 && len(chunks) > maxChunksPerQuery { @@ -129,7 +132,8 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc } // Now fetch the actual chunk data from Memcache / S3 - allChunks, err := c.FetchChunks(ctx, filtered, keys) + keys := keysFromChunks(chunks) + allChunks, err := fetcher.FetchChunks(ctx, chunks, keys) if err != nil { level.Error(log).Log("msg", "FetchChunks", "err", err) return nil, err @@ -144,6 +148,11 @@ func (c *seriesStore) GetChunkRefs(ctx context.Context, from, through model.Time log, ctx := spanlogger.New(ctx, "SeriesStore.GetChunkRefs") defer log.Span.Finish() + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, nil, err + } + // Validate the query is within reasonable bounds. metricName, matchers, shortcut, err := c.validateQuery(ctx, from, &through, allMatchers) if err != nil { @@ -171,12 +180,16 @@ func (c *seriesStore) GetChunkRefs(ctx context.Context, from, through model.Time } level.Debug(log).Log("chunk-ids", len(chunkIDs)) - chunks, err := c.convertChunkIDsToChunks(ctx, chunkIDs) + chunks, err := c.convertChunkIDsToChunks(ctx, userID, chunkIDs) if err != nil { level.Error(log).Log("op", "convertChunkIDsToChunks", "err", err) return nil, nil, err } + chunks = filterChunksByTime(from, through, chunks) + level.Debug(log).Log("chunks-post-filtering", len(chunks)) + chunksPerQuery.Observe(float64(len(chunks))) + return [][]Chunk{chunks}, []*Fetcher{c.store.Fetcher}, nil } @@ -215,6 +228,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from var preIntersectionCount int var lastErr error var cardinalityExceededErrors int + var cardinalityExceededError CardinalityExceededError for i := 0; i < len(matchers); i++ { select { case incoming := <-incomingIDs: @@ -229,8 +243,9 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from // series and the other returns only 10 (a few), we don't lookup the first one at all. // We just manually filter through the 10 series again using "filterChunksByMatchers", // saving us from looking up and intersecting a lot of series. - if err == ErrCardinalityExceeded { + if e, ok := err.(CardinalityExceededError); ok { cardinalityExceededErrors++ + cardinalityExceededError = e } else { lastErr = err } @@ -239,7 +254,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from // But if every single matcher returns a lot of series, then it makes sense to abort the query. if cardinalityExceededErrors == len(matchers) { - return nil, ErrCardinalityExceeded + return nil, cardinalityExceededError } else if lastErr != nil { return nil, lastErr } @@ -260,11 +275,14 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, } var queries []IndexQuery + var labelName string if matcher == nil { queries, err = c.schema.GetReadQueriesForMetric(from, through, userID, model.LabelValue(metricName)) } else if matcher.Type != labels.MatchEqual { + labelName = matcher.Name queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, model.LabelValue(metricName), model.LabelName(matcher.Name)) } else { + labelName = matcher.Name queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, model.LabelValue(metricName), model.LabelName(matcher.Name), model.LabelValue(matcher.Value)) } if err != nil { @@ -273,7 +291,11 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, level.Debug(log).Log("queries", len(queries)) entries, err := c.lookupEntriesByQueries(ctx, queries) - if err != nil { + if e, ok := err.(CardinalityExceededError); ok { + e.MetricName = metricName + e.LabelName = labelName + return nil, e + } else if err != nil { return nil, err } level.Debug(log).Log("entries", len(entries)) @@ -329,21 +351,16 @@ func (c *seriesStore) Put(ctx context.Context, chunks []Chunk) error { // PutOne implements ChunkStore func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error { - userID, err := user.ExtractOrgID(ctx) - if err != nil { - return err - } - chunks := []Chunk{chunk} - err = c.storage.PutChunks(ctx, chunks) + err := c.storage.PutChunks(ctx, chunks) if err != nil { return err } c.writeBackCache(ctx, chunks) - writeReqs, keysToCache, err := c.calculateIndexEntries(userID, from, through, chunk) + writeReqs, keysToCache, err := c.calculateIndexEntries(from, through, chunk) if err != nil { return err } @@ -358,7 +375,7 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun } // calculateIndexEntries creates a set of batched WriteRequests for all the chunks it is given. -func (c *seriesStore) calculateIndexEntries(userID string, from, through model.Time, chunk Chunk) (WriteBatch, []string, error) { +func (c *seriesStore) calculateIndexEntries(from, through model.Time, chunk Chunk) (WriteBatch, []string, error) { seenIndexEntries := map[string]struct{}{} entries := []IndexEntry{} keysToCache := []string{} @@ -368,7 +385,7 @@ func (c *seriesStore) calculateIndexEntries(userID string, from, through model.T return nil, nil, err } - keys := c.schema.GetLabelEntryCacheKeys(from, through, userID, chunk.Metric) + keys := c.schema.GetLabelEntryCacheKeys(from, through, chunk.UserID, chunk.Metric) cacheKeys := make([]string, 0, len(keys)) // Keys which translate to the strings stored in the cache. for _, key := range keys { @@ -379,7 +396,7 @@ func (c *seriesStore) calculateIndexEntries(userID string, from, through model.T _, _, missing := c.writeDedupeCache.Fetch(context.Background(), cacheKeys) if len(missing) != 0 { - labelEntries, err := c.schema.GetLabelWriteEntries(from, through, userID, metricName, chunk.Metric, chunk.ExternalKey()) + labelEntries, err := c.schema.GetLabelWriteEntries(from, through, chunk.UserID, metricName, chunk.Metric, chunk.ExternalKey()) if err != nil { return nil, nil, err } @@ -388,7 +405,7 @@ func (c *seriesStore) calculateIndexEntries(userID string, from, through model.T keysToCache = missing } - chunkEntries, err := c.schema.GetChunkWriteEntries(from, through, userID, metricName, chunk.Metric, chunk.ExternalKey()) + chunkEntries, err := c.schema.GetChunkWriteEntries(from, through, chunk.UserID, metricName, chunk.Metric, chunk.ExternalKey()) if err != nil { return nil, nil, err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_fixtures.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_fixtures.go index 1469e92d75ed5..47e90fe44e572 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_fixtures.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_fixtures.go @@ -40,5 +40,6 @@ var Fixtures = []testutils.Fixture{ func defaultLimits() (*validation.Overrides, error) { var defaults validation.Limits flagext.DefaultValues(&defaults) + defaults.CardinalityLimit = 5 return validation.NewOverrides(defaults) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_index_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_index_client.go index c4df850b885b0..5513a5d6e83db 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_index_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_index_client.go @@ -88,7 +88,10 @@ func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.Ind batches, misses := s.cacheFetch(ctx, keys) for _, batch := range batches { if cardinalityLimit > 0 && batch.Cardinality > cardinalityLimit { - return chunk.ErrCardinalityExceeded + return chunk.CardinalityExceededError{ + Size: batch.Cardinality, + Limit: cardinalityLimit, + } } queries := queriesByKey[batch.Key] @@ -156,7 +159,10 @@ func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.Ind if cardinalityLimit > 0 && cardinality > cardinalityLimit { batch.Cardinality = cardinality batch.Entries = nil - cardinalityErr = chunk.ErrCardinalityExceeded + cardinalityErr = chunk.CardinalityExceededError{ + Size: cardinality, + Limit: cardinalityLimit, + } } keys = append(keys, key) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/factory.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/factory.go index a0fd1b41406ac..30d3cd1764810 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/factory.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/factory.go @@ -28,11 +28,9 @@ type Config struct { BoltDBConfig local.BoltDBConfig `yaml:"boltdb"` FSConfig local.FSConfig `yaml:"filesystem"` - IndexCacheSize int IndexCacheValidity time.Duration - memcacheClient cache.MemcachedClientConfig - indexQueriesCacheConfig cache.Config + IndexQueriesCacheConfig cache.Config `yaml:"index_queries_cache_config,omitempty"` } // RegisterFlags adds the flags required to configure this flag set. @@ -44,43 +42,15 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.BoltDBConfig.RegisterFlags(f) cfg.FSConfig.RegisterFlags(f) - // Deprecated flags!! - f.IntVar(&cfg.IndexCacheSize, "store.index-cache-size", 0, "Deprecated: Use -store.index-cache-read.*; Size of in-memory index cache, 0 to disable.") - cfg.memcacheClient.RegisterFlagsWithPrefix("index.", "Deprecated: Use -store.index-cache-read.*;", f) - - cfg.indexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "Cache config for index entry reading. ", f) + cfg.IndexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "Cache config for index entry reading. ", f) f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Cache validity for active index entries. Should be no higher than -ingester.max-chunk-idle.") } // NewStore makes the storage clients based on the configuration. func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits *validation.Overrides) (chunk.Store, error) { - var err error - - // Building up from deprecated flags. - var caches []cache.Cache - if cfg.IndexCacheSize > 0 { - fifocache := cache.Instrument("fifo-index", cache.NewFifoCache("index", cache.FifoCacheConfig{Size: cfg.IndexCacheSize})) - caches = append(caches, fifocache) - } - if cfg.memcacheClient.Host != "" { - client := cache.NewMemcachedClient(cfg.memcacheClient) - memcache := cache.Instrument("memcache-index", cache.NewMemcached(cache.MemcachedConfig{ - Expiration: cfg.IndexCacheValidity, - }, client, "memcache-index")) - caches = append(caches, cache.NewBackground("memcache-index", cache.BackgroundConfig{ - WriteBackGoroutines: 10, - WriteBackBuffer: 100, - }, memcache)) - } - - var tieredCache cache.Cache - if len(caches) > 0 { - tieredCache = cache.NewTiered(caches) - } else { - tieredCache, err = cache.New(cfg.indexQueriesCacheConfig) - if err != nil { - return nil, err - } + tieredCache, err := cache.New(cfg.IndexQueriesCacheConfig) + if err != nil { + return nil, err } // Cache is shared by multiple stores, which means they will try and Stop diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go index 4c99a77fc9abc..b32034001ffee 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go @@ -110,48 +110,6 @@ func (cfg *ProvisionConfig) RegisterFlags(argPrefix string, f *flag.FlagSet) { f.Int64Var(&cfg.InactiveReadScaleLastN, argPrefix+".inactive-read-throughput.scale-last-n", 4, "Number of last inactive tables to enable read autoscale.") } -// Tags is a string-string map that implements flag.Value. -type Tags map[string]string - -// String implements flag.Value -func (ts Tags) String() string { - if ts == nil { - return "" - } - - return fmt.Sprintf("%v", map[string]string(ts)) -} - -// Set implements flag.Value -func (ts *Tags) Set(s string) error { - if *ts == nil { - *ts = map[string]string{} - } - - parts := strings.SplitN(s, "=", 2) - if len(parts) != 2 { - return fmt.Errorf("tag must of the format key=value") - } - (*ts)[parts[0]] = parts[1] - return nil -} - -// Equals returns true is other matches ts. -func (ts Tags) Equals(other Tags) bool { - if len(ts) != len(other) { - return false - } - - for k, v1 := range ts { - v2, ok := other[k] - if !ok || v1 != v2 { - return false - } - } - - return true -} - // TableManager creates and manages the provisioned throughput on DynamoDB tables type TableManager struct { client TableClient @@ -333,8 +291,12 @@ func (m *TableManager) partitionTables(ctx context.Context, descriptions []Table // Ensure we only delete tables which have a prefix managed by Cortex. tablePrefixes := map[string]struct{}{} for _, cfg := range m.schemaCfg.Configs { - tablePrefixes[cfg.IndexTables.Prefix] = struct{}{} - tablePrefixes[cfg.ChunkTables.Prefix] = struct{}{} + if cfg.IndexTables.Prefix != "" { + tablePrefixes[cfg.IndexTables.Prefix] = struct{}{} + } + if cfg.ChunkTables.Prefix != "" { + tablePrefixes[cfg.ChunkTables.Prefix] = struct{}{} + } } for existingTable := range existingTables { diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/tags.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/tags.go new file mode 100644 index 0000000000000..b51849ceb2782 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/tags.go @@ -0,0 +1,58 @@ +package chunk + +import ( + "fmt" + "strings" +) + +// Tags is a string-string map that implements flag.Value. +type Tags map[string]string + +// String implements flag.Value +func (ts Tags) String() string { + if ts == nil { + return "" + } + + return fmt.Sprintf("%v", map[string]string(ts)) +} + +// Set implements flag.Value +func (ts *Tags) Set(s string) error { + if *ts == nil { + *ts = map[string]string{} + } + + parts := strings.SplitN(s, "=", 2) + if len(parts) != 2 { + return fmt.Errorf("tag must of the format key=value") + } + (*ts)[parts[0]] = parts[1] + return nil +} + +// UnmarshalYAML implements yaml.Unmarshaler. +func (ts *Tags) UnmarshalYAML(unmarshal func(interface{}) error) error { + var m map[string]string + if err := unmarshal(&m); err != nil { + return err + } + *ts = Tags(m) + return nil +} + +// Equals returns true is other matches ts. +func (ts Tags) Equals(other Tags) bool { + if len(ts) != len(other) { + return false + } + + for k, v1 := range ts { + v2, ok := other[k] + if !ok || v1 != v2 { + return false + } + } + + return true +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/client.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/client.go index 54679185f1c54..dc30f7ff540e6 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/client.go @@ -75,9 +75,4 @@ type Config struct { // RegisterFlags registers configuration settings used by the ingester client config. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.GRPCClientConfig.RegisterFlags("ingester.client", f) - - // Deprecated. - f.Int("ingester.client.max-recv-message-size", 64*1024*1024, "DEPRECATED. Maximum message size, in bytes, this client will receive.") - f.Bool("ingester.client.compress-to-ingester", false, "DEPRECATED. Compress data in calls to ingesters.") - f.Bool("distributor.compress-to-ingester", false, "DEPRECATED. Compress data in calls to ingesters. (DEPRECATED: use ingester.client.compress-to-ingester instead") } diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/flagext/url.go b/vendor/github.com/cortexproject/cortex/pkg/util/flagext/url.go index 9d470b5051dcb..5d798612ce4c1 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/flagext/url.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/flagext/url.go @@ -31,11 +31,5 @@ func (v *URLValue) UnmarshalYAML(unmarshal func(interface{}) error) error { if err := unmarshal(&s); err != nil { return err } - - u, err := url.Parse(s) - if err != nil { - return err - } - v.URL = u - return nil + return v.Set(s) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/spanlogger/spanlogger.go b/vendor/github.com/cortexproject/cortex/pkg/util/spanlogger/spanlogger.go index 12326db9b2d83..61d825f8a1bf8 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/spanlogger/spanlogger.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/spanlogger/spanlogger.go @@ -6,6 +6,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" otlog "github.com/opentracing/opentracing-go/log" "github.com/cortexproject/cortex/pkg/util" @@ -41,3 +42,13 @@ func (s *SpanLogger) Log(kvps ...interface{}) error { s.Span.LogFields(fields...) return nil } + +// Error sets error flag and logs the error, if non-nil. Returns the err passed in. +func (s *SpanLogger) Error(err error) error { + if err == nil { + return nil + } + ext.Error.Set(s.Span, true) + s.Span.LogFields(otlog.Error(err)) + return err +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/validation/override.go b/vendor/github.com/cortexproject/cortex/pkg/util/validation/override.go index a2553f7de813a..7969aed12fe53 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/validation/override.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/validation/override.go @@ -18,6 +18,10 @@ var overridesReloadSuccess = promauto.NewGauge(prometheus.GaugeOpts{ Help: "Whether the last overrides reload attempt was successful.", }) +func init() { + overridesReloadSuccess.Set(1) // Default to 1 +} + // When we load YAML from disk, we want the various per-customer limits // to default to any values specified on the command line, not default // command line values. This global contains those values. I (Tom) cannot