From ad2c37066b36dd06ac0b05dd6dd375d2ab959374 Mon Sep 17 00:00:00 2001 From: fuling Date: Wed, 8 Dec 2021 17:05:31 +0800 Subject: [PATCH 01/10] =?UTF-8?q?[optimization]=20cache=20prometheus=20:?= =?UTF-8?q?=20fix=20"loki=5Fcache=5Frequest=5Fduration=5Fseconds=5Fbucket"?= =?UTF-8?q?=20=E2=80=98status=5Fcode=E2=80=99=20label=20always=20equals=20?= =?UTF-8?q?"200"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/storage/chunk/cache/background.go | 5 ++-- pkg/storage/chunk/cache/cache.go | 4 +-- pkg/storage/chunk/cache/cache_gen.go | 8 +++--- pkg/storage/chunk/cache/cache_test.go | 6 ++--- pkg/storage/chunk/cache/fifo_cache.go | 5 ++-- pkg/storage/chunk/cache/instrumented.go | 28 +++++++++---------- pkg/storage/chunk/cache/memcached.go | 30 ++++++++++++--------- pkg/storage/chunk/cache/memcached_test.go | 4 +-- pkg/storage/chunk/cache/mock.go | 5 ++-- pkg/storage/chunk/cache/redis_cache.go | 5 ++-- pkg/storage/chunk/cache/redis_cache_test.go | 4 +-- pkg/storage/chunk/cache/snappy.go | 12 ++++----- pkg/storage/chunk/cache/tiered.go | 25 ++++++++++++----- pkg/storage/chunk/cache/tiered_test.go | 2 +- pkg/storage/chunk/chunk_store_utils.go | 8 +++--- pkg/storage/chunk/series_store.go | 4 +-- 16 files changed, 89 insertions(+), 66 deletions(-) diff --git a/pkg/storage/chunk/cache/background.go b/pkg/storage/chunk/cache/background.go index db679200fc163..2ed3db620ab9f 100644 --- a/pkg/storage/chunk/cache/background.go +++ b/pkg/storage/chunk/cache/background.go @@ -81,7 +81,7 @@ func (c *backgroundCache) Stop() { const keysPerBatch = 100 // Store writes keys for the cache in the background. -func (c *backgroundCache) Store(ctx context.Context, keys []string, bufs [][]byte) { +func (c *backgroundCache) Store(ctx context.Context, keys []string, bufs [][]byte) error { for len(keys) > 0 { num := keysPerBatch if num > len(keys) { @@ -101,11 +101,12 @@ func (c *backgroundCache) Store(ctx context.Context, keys []string, bufs [][]byt if sp != nil { sp.LogFields(otlog.Int("dropped", num)) } - return // queue is full; give up + return nil// queue is full; give up } keys = keys[num:] bufs = bufs[num:] } + return nil } func (c *backgroundCache) writeBackLoop() { diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index 10418524bbaf7..e3ee03d373252 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -18,8 +18,8 @@ import ( // Whatsmore, we found partially successful Fetchs were often treated as failed // when they returned an error. type Cache interface { - Store(ctx context.Context, key []string, buf [][]byte) - Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string) + Store(ctx context.Context, key []string, buf [][]byte) error + Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) Stop() } diff --git a/pkg/storage/chunk/cache/cache_gen.go b/pkg/storage/chunk/cache/cache_gen.go index 3fd151db1d4fb..022cb5194b1c4 100644 --- a/pkg/storage/chunk/cache/cache_gen.go +++ b/pkg/storage/chunk/cache/cache_gen.go @@ -21,17 +21,17 @@ func NewCacheGenNumMiddleware(downstreamCache Cache) Cache { } // Store adds cache gen number to keys before calling Store method of downstream cache. -func (c GenNumMiddleware) Store(ctx context.Context, keys []string, buf [][]byte) { +func (c GenNumMiddleware) Store(ctx context.Context, keys []string, buf [][]byte) error { keys = addCacheGenNumToCacheKeys(ctx, keys) - c.downstreamCache.Store(ctx, keys, buf) + return c.downstreamCache.Store(ctx, keys, buf) } // Fetch adds cache gen number to keys before calling Fetch method of downstream cache. // It also removes gen number before responding back with found and missing keys to make sure consumer of response gets to see same keys. -func (c GenNumMiddleware) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string) { +func (c GenNumMiddleware) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) { keys = addCacheGenNumToCacheKeys(ctx, keys) - found, bufs, missing = c.downstreamCache.Fetch(ctx, keys) + found, bufs, missing, err = c.downstreamCache.Fetch(ctx, keys) found = removeCacheGenNumFromKeys(ctx, found) missing = removeCacheGenNumFromKeys(ctx, missing) diff --git a/pkg/storage/chunk/cache/cache_test.go b/pkg/storage/chunk/cache/cache_test.go index aa3d0d3d094d9..872d624d2ccb0 100644 --- a/pkg/storage/chunk/cache/cache_test.go +++ b/pkg/storage/chunk/cache/cache_test.go @@ -82,7 +82,7 @@ func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []ch index := rand.Intn(len(keys)) key := keys[index] - found, bufs, missingKeys := cache.Fetch(context.Background(), []string{key}) + found, bufs, missingKeys, _ := cache.Fetch(context.Background(), []string{key}) require.Len(t, found, 1) require.Len(t, bufs, 1) require.Len(t, missingKeys, 0) @@ -97,7 +97,7 @@ func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []ch func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []chunk.Chunk) { // test getting them all - found, bufs, missingKeys := cache.Fetch(context.Background(), keys) + found, bufs, missingKeys, _ := cache.Fetch(context.Background(), keys) require.Len(t, found, len(keys)) require.Len(t, bufs, len(keys)) require.Len(t, missingKeys, 0) @@ -134,7 +134,7 @@ func (a byExternalKey) Less(i, j int) bool { return a[i].ExternalKey() < a[j].Ex func testCacheMiss(t *testing.T, cache cache.Cache) { for i := 0; i < 100; i++ { key := strconv.Itoa(rand.Int()) // arbitrary key which should fail: no chunk key is a single integer - found, bufs, missing := cache.Fetch(context.Background(), []string{key}) + found, bufs, missing, _ := cache.Fetch(context.Background(), []string{key}) require.Empty(t, found) require.Empty(t, bufs) require.Len(t, missing, 1) diff --git a/pkg/storage/chunk/cache/fifo_cache.go b/pkg/storage/chunk/cache/fifo_cache.go index c4f969c8b3484..4879c92648622 100644 --- a/pkg/storage/chunk/cache/fifo_cache.go +++ b/pkg/storage/chunk/cache/fifo_cache.go @@ -180,7 +180,7 @@ func NewFifoCache(name string, cfg FifoCacheConfig, reg prometheus.Registerer, l } // Fetch implements Cache. -func (c *FifoCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string) { +func (c *FifoCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) { found, missing, bufs = make([]string, 0, len(keys)), make([]string, 0, len(keys)), make([][]byte, 0, len(keys)) for _, key := range keys { val, ok := c.Get(ctx, key) @@ -196,7 +196,7 @@ func (c *FifoCache) Fetch(ctx context.Context, keys []string) (found []string, b } // Store implements Cache. -func (c *FifoCache) Store(ctx context.Context, keys []string, values [][]byte) { +func (c *FifoCache) Store(ctx context.Context, keys []string, values [][]byte) error { c.entriesAdded.Inc() c.lock.Lock() @@ -205,6 +205,7 @@ func (c *FifoCache) Store(ctx context.Context, keys []string, values [][]byte) { for i := range keys { c.put(keys[i], values[i]) } + return nil } // Stop implements Cache. diff --git a/pkg/storage/chunk/cache/instrumented.go b/pkg/storage/chunk/cache/instrumented.go index c155b36c33dea..adc8768102766 100644 --- a/pkg/storage/chunk/cache/instrumented.go +++ b/pkg/storage/chunk/cache/instrumented.go @@ -64,35 +64,35 @@ type instrumentedCache struct { requestDuration *instr.HistogramCollector } -func (i *instrumentedCache) Store(ctx context.Context, keys []string, bufs [][]byte) { +func (i *instrumentedCache) Store(ctx context.Context, keys []string, bufs [][]byte) error { for j := range bufs { i.storedValueSize.Observe(float64(len(bufs[j]))) } method := i.name + ".store" - _ = instr.CollectedRequest(ctx, method, i.requestDuration, instr.ErrorCode, func(ctx context.Context) error { + return instr.CollectedRequest(ctx, method, i.requestDuration, instr.ErrorCode, func(ctx context.Context) error { sp := ot.SpanFromContext(ctx) sp.LogFields(otlog.Int("keys", len(keys))) - i.Cache.Store(ctx, keys, bufs) - return nil + err := i.Cache.Store(ctx, keys, bufs) + return err }) } -func (i *instrumentedCache) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, []string) { +func (i *instrumentedCache) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) { var ( - found []string - bufs [][]byte - missing []string - method = i.name + ".fetch" + found []string + bufs [][]byte + missing []string + fetchErr error = nil + method = i.name + ".fetch" ) - _ = instr.CollectedRequest(ctx, method, i.requestDuration, instr.ErrorCode, func(ctx context.Context) error { + err := instr.CollectedRequest(ctx, method, i.requestDuration, instr.ErrorCode, func(ctx context.Context) error { sp := ot.SpanFromContext(ctx) sp.LogFields(otlog.Int("keys requested", len(keys))) - - found, bufs, missing = i.Cache.Fetch(ctx, keys) + found, bufs, missing, fetchErr = i.Cache.Fetch(ctx, keys) sp.LogFields(otlog.Int("keys found", len(found)), otlog.Int("keys missing", len(keys)-len(found))) - return nil + return fetchErr }) i.fetchedKeys.Add(float64(len(keys))) @@ -101,7 +101,7 @@ func (i *instrumentedCache) Fetch(ctx context.Context, keys []string) ([]string, i.fetchedValueSize.Observe(float64(len(bufs[j]))) } - return found, bufs, missing + return found, bufs, missing, err } func (i *instrumentedCache) Stop() { diff --git a/pkg/storage/chunk/cache/memcached.go b/pkg/storage/chunk/cache/memcached.go index ab573cc80f2ff..8417251e7e091 100644 --- a/pkg/storage/chunk/cache/memcached.go +++ b/pkg/storage/chunk/cache/memcached.go @@ -80,7 +80,7 @@ func NewMemcached(cfg MemcachedConfig, client MemcachedClient, name string, reg res := &result{ batchID: input.batchID, } - res.found, res.bufs, res.missed = c.fetch(input.ctx, input.keys) + res.found, res.bufs, res.missed, res.err = c.fetch(input.ctx, input.keys) input.resultCh <- res } @@ -102,6 +102,7 @@ type result struct { found []string bufs [][]byte missed []string + err error batchID int // For ordering results. } @@ -120,21 +121,20 @@ func memcacheStatusCode(err error) string { } // Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested. -func (c *Memcached) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) { +func (c *Memcached) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) { if c.cfg.BatchSize == 0 { - found, bufs, missed = c.fetch(ctx, keys) + found, bufs, missed, err = c.fetch(ctx, keys) return } start := time.Now() - found, bufs, missed = c.fetchKeysBatched(ctx, keys) - c.requestDuration.After(ctx, "Memcache.GetBatched", "200", start) + found, bufs, missed, err = c.fetchKeysBatched(ctx, keys) + c.requestDuration.After(ctx, "Memcache.GetBatched", memcacheStatusCode(err), start) return } -func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) { +func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) { var ( - err error start = time.Now() items map[string]*memcache.Item ) @@ -146,7 +146,7 @@ func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, b "keys requested", len(keys), "err", err, ) - return found, bufs, keys + return found, bufs, keys, err } for _, key := range keys { @@ -161,7 +161,7 @@ func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, b return } -func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) { +func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) { resultsCh := make(chan *result) batchSize := c.cfg.BatchSize @@ -196,15 +196,19 @@ func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found found = append(found, result.found...) bufs = append(bufs, result.bufs...) missed = append(missed, result.missed...) + if result.err != nil { + err = result.err + } } return } // Store stores the key in the cache. -func (c *Memcached) Store(ctx context.Context, keys []string, bufs [][]byte) { +func (c *Memcached) Store(ctx context.Context, keys []string, bufs [][]byte) error { + var err error = nil for i := range keys { - err := instr.CollectedRequest(ctx, "Memcache.Put", c.requestDuration, memcacheStatusCode, func(_ context.Context) error { + cacheErr := instr.CollectedRequest(ctx, "Memcache.Put", c.requestDuration, memcacheStatusCode, func(_ context.Context) error { item := memcache.Item{ Key: keys[i], Value: bufs[i], @@ -212,10 +216,12 @@ func (c *Memcached) Store(ctx context.Context, keys []string, bufs [][]byte) { } return c.memcache.Set(&item) }) - if err != nil { + if cacheErr != nil { level.Error(c.logger).Log("msg", "failed to put to memcached", "name", c.name, "err", err) + err = cacheErr } } + return err } // Stop does nothing. diff --git a/pkg/storage/chunk/cache/memcached_test.go b/pkg/storage/chunk/cache/memcached_test.go index 129500d2afbdb..d641225ff0e39 100644 --- a/pkg/storage/chunk/cache/memcached_test.go +++ b/pkg/storage/chunk/cache/memcached_test.go @@ -54,7 +54,7 @@ func testMemcache(t *testing.T, memcache *cache.Memcached) { } memcache.Store(ctx, keys, bufs) - found, bufs, missing := memcache.Fetch(ctx, keysIncMissing) + found, bufs, missing, _ := memcache.Fetch(ctx, keysIncMissing) for i := 0; i < numKeys; i++ { if i%5 == 0 { require.Equal(t, fmt.Sprint(i), missing[0]) @@ -129,7 +129,7 @@ func testMemcacheFailing(t *testing.T, memcache *cache.Memcached) { memcache.Store(ctx, keys, bufs) for i := 0; i < 10; i++ { - found, bufs, missing := memcache.Fetch(ctx, keysIncMissing) + found, bufs, missing, _ := memcache.Fetch(ctx, keysIncMissing) require.Equal(t, len(found), len(bufs)) for i := range found { diff --git a/pkg/storage/chunk/cache/mock.go b/pkg/storage/chunk/cache/mock.go index 6503aea80dc0c..7cc0cad7a0606 100644 --- a/pkg/storage/chunk/cache/mock.go +++ b/pkg/storage/chunk/cache/mock.go @@ -10,15 +10,16 @@ type mockCache struct { cache map[string][]byte } -func (m *mockCache) Store(_ context.Context, keys []string, bufs [][]byte) { +func (m *mockCache) Store(_ context.Context, keys []string, bufs [][]byte) error { m.Lock() defer m.Unlock() for i := range keys { m.cache[keys[i]] = bufs[i] } + return nil } -func (m *mockCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string) { +func (m *mockCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) { m.Lock() defer m.Unlock() for _, key := range keys { diff --git a/pkg/storage/chunk/cache/redis_cache.go b/pkg/storage/chunk/cache/redis_cache.go index 9e86cd54f4386..6d7690a52fc2e 100644 --- a/pkg/storage/chunk/cache/redis_cache.go +++ b/pkg/storage/chunk/cache/redis_cache.go @@ -31,7 +31,7 @@ func NewRedisCache(name string, redisClient *RedisClient, logger log.Logger) *Re } // Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested. -func (c *RedisCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) { +func (c *RedisCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) { data, err := c.redis.MGet(ctx, keys) if err != nil { level.Error(c.logger).Log("msg", "failed to get from redis", "name", c.name, "err", err) @@ -51,11 +51,12 @@ func (c *RedisCache) Fetch(ctx context.Context, keys []string) (found []string, } // Store stores the key in the cache. -func (c *RedisCache) Store(ctx context.Context, keys []string, bufs [][]byte) { +func (c *RedisCache) Store(ctx context.Context, keys []string, bufs [][]byte) error { err := c.redis.MSet(ctx, keys, bufs) if err != nil { level.Error(c.logger).Log("msg", "failed to put to redis", "name", c.name, "err", err) } + return err } // Stop stops the redis client. diff --git a/pkg/storage/chunk/cache/redis_cache_test.go b/pkg/storage/chunk/cache/redis_cache_test.go index 0032bff46f786..59d80062cbf92 100644 --- a/pkg/storage/chunk/cache/redis_cache_test.go +++ b/pkg/storage/chunk/cache/redis_cache_test.go @@ -31,7 +31,7 @@ func TestRedisCache(t *testing.T) { c.Store(ctx, keys, bufs) // test hits - found, data, missed := c.Fetch(ctx, keys) + found, data, missed, _ := c.Fetch(ctx, keys) require.Len(t, found, nHit) require.Len(t, missed, 0) @@ -41,7 +41,7 @@ func TestRedisCache(t *testing.T) { } // test misses - found, _, missed = c.Fetch(ctx, miss) + found, _, missed, _ = c.Fetch(ctx, miss) require.Len(t, found, 0) require.Len(t, missed, nMiss) diff --git a/pkg/storage/chunk/cache/snappy.go b/pkg/storage/chunk/cache/snappy.go index b19b6dc4a0b84..9fa708888d367 100644 --- a/pkg/storage/chunk/cache/snappy.go +++ b/pkg/storage/chunk/cache/snappy.go @@ -21,27 +21,27 @@ func NewSnappy(next Cache, logger log.Logger) Cache { } } -func (s *snappyCache) Store(ctx context.Context, keys []string, bufs [][]byte) { +func (s *snappyCache) Store(ctx context.Context, keys []string, bufs [][]byte) error { cs := make([][]byte, 0, len(bufs)) for _, buf := range bufs { c := snappy.Encode(nil, buf) cs = append(cs, c) } - s.next.Store(ctx, keys, cs) + return s.next.Store(ctx, keys, cs) } -func (s *snappyCache) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, []string) { - found, bufs, missing := s.next.Fetch(ctx, keys) +func (s *snappyCache) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) { + found, bufs, missing, err := s.next.Fetch(ctx, keys) ds := make([][]byte, 0, len(bufs)) for _, buf := range bufs { d, err := snappy.Decode(nil, buf) if err != nil { level.Error(s.logger).Log("msg", "failed to decode cache entry", "err", err) - return nil, nil, keys + return nil, nil, keys, err } ds = append(ds, d) } - return found, ds, missing + return found, ds, missing, err } func (s *snappyCache) Stop() { diff --git a/pkg/storage/chunk/cache/tiered.go b/pkg/storage/chunk/cache/tiered.go index bb2012ac3181b..c0fa210ca7d05 100644 --- a/pkg/storage/chunk/cache/tiered.go +++ b/pkg/storage/chunk/cache/tiered.go @@ -19,16 +19,22 @@ func IsEmptyTieredCache(cache Cache) bool { return ok && len(c) == 0 } -func (t tiered) Store(ctx context.Context, keys []string, bufs [][]byte) { +func (t tiered) Store(ctx context.Context, keys []string, bufs [][]byte) error { + var err error = nil for _, c := range []Cache(t) { - c.Store(ctx, keys, bufs) + cacheErr := c.Store(ctx, keys, bufs) + if cacheErr != nil { + err = cacheErr + } } + return err } -func (t tiered) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, []string) { +func (t tiered) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) { found := make(map[string][]byte, len(keys)) missing := keys previousCaches := make([]Cache, 0, len(t)) + var err error = nil for _, c := range []Cache(t) { var ( @@ -36,9 +42,14 @@ func (t tiered) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, [ passBufs [][]byte ) - passKeys, passBufs, missing = c.Fetch(ctx, missing) - tiered(previousCaches).Store(ctx, passKeys, passBufs) - + passKeys, passBufs, missing, err = c.Fetch(ctx, missing) + if err != nil { + return passKeys, passBufs, missing, err + } + err := tiered(previousCaches).Store(ctx, passKeys, passBufs) + if err != nil { + return passKeys, passBufs, missing, err + } for i, key := range passKeys { found[key] = passBufs[i] } @@ -59,7 +70,7 @@ func (t tiered) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, [ } } - return resultKeys, resultBufs, missing + return resultKeys, resultBufs, missing, nil } func (t tiered) Stop() { diff --git a/pkg/storage/chunk/cache/tiered_test.go b/pkg/storage/chunk/cache/tiered_test.go index abb1a6274894e..941e42ac2fe78 100644 --- a/pkg/storage/chunk/cache/tiered_test.go +++ b/pkg/storage/chunk/cache/tiered_test.go @@ -27,7 +27,7 @@ func TestTiered(t *testing.T) { level1.Store(context.Background(), []string{"key1"}, [][]byte{[]byte("hello")}) level2.Store(context.Background(), []string{"key2"}, [][]byte{[]byte("world")}) - keys, bufs, missing := cache.Fetch(context.Background(), []string{"key1", "key2", "key3"}) + keys, bufs, missing, _ := cache.Fetch(context.Background(), []string{"key1", "key2", "key3"}) require.Equal(t, []string{"key1", "key2"}, keys) require.Equal(t, [][]byte{[]byte("hello"), []byte("world")}, bufs) require.Equal(t, []string{"key3"}, missing) diff --git a/pkg/storage/chunk/chunk_store_utils.go b/pkg/storage/chunk/chunk_store_utils.go index 6561364c7f401..9dcfa1aebb1b1 100644 --- a/pkg/storage/chunk/chunk_store_utils.go +++ b/pkg/storage/chunk/chunk_store_utils.go @@ -146,12 +146,14 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string defer log.Span.Finish() // Now fetch the actual chunk data from Memcache / S3 - cacheHits, cacheBufs, _ := c.cache.Fetch(ctx, keys) - - fromCache, missing, err := c.processCacheResponse(ctx, chunks, cacheHits, cacheBufs) + cacheHits, cacheBufs, _, err := c.cache.Fetch(ctx, keys) if err != nil { level.Warn(log).Log("msg", "error fetching from cache", "err", err) } + fromCache, missing, err := c.processCacheResponse(ctx, chunks, cacheHits, cacheBufs) + if err != nil { + level.Warn(log).Log("msg", "error process response from cache", "err", err) + } var fromStorage []Chunk if len(missing) > 0 { diff --git a/pkg/storage/chunk/series_store.go b/pkg/storage/chunk/series_store.go index 44be57105f7e9..fc1f7de6d5f3b 100644 --- a/pkg/storage/chunk/series_store.go +++ b/pkg/storage/chunk/series_store.go @@ -424,7 +424,7 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun writeChunk := true // If this chunk is in cache it must already be in the database so we don't need to write it again - found, _, _ := c.fetcher.cache.Fetch(ctx, []string{chunk.ExternalKey()}) + found, _, _, _ := c.fetcher.cache.Fetch(ctx, []string{chunk.ExternalKey()}) if len(found) > 0 { writeChunk = false dedupedChunksTotal.Inc() @@ -491,7 +491,7 @@ func (c *seriesStore) calculateIndexEntries(ctx context.Context, from, through m if err != nil { return nil, nil, err } - _, _, missing := c.writeDedupeCache.Fetch(ctx, keys) + _, _, missing, _ := c.writeDedupeCache.Fetch(ctx, keys) // keys and labelEntries are matched in order, but Fetch() may // return missing keys in any order so check against all of them. for _, missingKey := range missing { From 0a8a058652f51ff93a9bb6aabb7b3211f7bad51d Mon Sep 17 00:00:00 2001 From: fuling Date: Wed, 8 Dec 2021 17:34:05 +0800 Subject: [PATCH 02/10] =?UTF-8?q?[optimization]=20cache=20prometheus=20:?= =?UTF-8?q?=20fix=20"loki=5Fcache=5Frequest=5Fduration=5Fseconds=5Fbucket"?= =?UTF-8?q?=20=E2=80=98status=5Fcode=E2=80=99=20label=20always=20equals=20?= =?UTF-8?q?"200"=20#4891?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/querier/queryrange/roundtrip.go | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index b47817332ffce..5705eaf0586f0 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -1,12 +1,14 @@ package queryrange import ( + "context" "errors" "flag" "net/http" "strings" "time" + cortexcache "github.com/cortexproject/cortex/pkg/chunk/cache" "github.com/cortexproject/cortex/pkg/querier/queryrange" "github.com/cortexproject/cortex/pkg/tenant" "github.com/go-kit/log" @@ -404,7 +406,7 @@ func NewMetricTripperware( if err != nil { return nil, nil, err } - c = cache + c = transform(cache) queryRangeMiddleware = append( queryRangeMiddleware, queryrange.InstrumentMiddleware("results_cache", instrumentMetrics), @@ -450,6 +452,28 @@ func NewMetricTripperware( }, c, nil } +func transform(c cortexcache.Cache) cache.Cache { + return &roundtripCache{ + cache: c, + } +} + +type roundtripCache struct { + cache cortexcache.Cache +} + +func (r *roundtripCache) Store(ctx context.Context, key []string, buf [][]byte) error { + r.cache.Store(ctx, key, buf) + return nil +} +func (r *roundtripCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) { + found, bufs, missing = r.cache.Fetch(ctx, keys) + return +} +func (r *roundtripCache) Stop() { + r.cache.Stop() +} + // NewInstantMetricTripperware creates a new frontend tripperware responsible for handling metric queries func NewInstantMetricTripperware( cfg Config, From c00271024c4edcfb72742ca57317bca68a7e3fb9 Mon Sep 17 00:00:00 2001 From: fuling Date: Wed, 8 Dec 2021 18:52:56 +0800 Subject: [PATCH 03/10] =?UTF-8?q?[optimization]=20cache=20prometheus=20:?= =?UTF-8?q?=20fix=20"loki=5Fcache=5Frequest=5Fduration=5Fseconds=5Fbucket"?= =?UTF-8?q?=20=E2=80=98status=5Fcode=E2=80=99=20label=20always=20equals=20?= =?UTF-8?q?"200"=20#4891?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/storage/chunk/cache/background.go | 2 +- pkg/storage/chunk/cache/cache_test.go | 3 ++- pkg/storage/chunk/cache/fifo_cache_test.go | 9 ++++++--- pkg/storage/chunk/cache/instrumented.go | 2 +- pkg/storage/chunk/cache/memcached.go | 2 +- pkg/storage/chunk/cache/memcached_test.go | 6 ++++-- pkg/storage/chunk/cache/tiered.go | 4 ++-- pkg/storage/chunk/cache/tiered_test.go | 6 ++++-- pkg/storage/chunk/chunk_store_utils.go | 5 ++++- pkg/storage/chunk/series_store.go | 5 ++++- pkg/storage/chunk/storage/caching_index_client.go | 2 +- 11 files changed, 30 insertions(+), 16 deletions(-) diff --git a/pkg/storage/chunk/cache/background.go b/pkg/storage/chunk/cache/background.go index 2ed3db620ab9f..b131e96bc277a 100644 --- a/pkg/storage/chunk/cache/background.go +++ b/pkg/storage/chunk/cache/background.go @@ -101,7 +101,7 @@ func (c *backgroundCache) Store(ctx context.Context, keys []string, bufs [][]byt if sp != nil { sp.LogFields(otlog.Int("dropped", num)) } - return nil// queue is full; give up + return nil // queue is full; give up } keys = keys[num:] bufs = bufs[num:] diff --git a/pkg/storage/chunk/cache/cache_test.go b/pkg/storage/chunk/cache/cache_test.go index 872d624d2ccb0..106e766ffe23c 100644 --- a/pkg/storage/chunk/cache/cache_test.go +++ b/pkg/storage/chunk/cache/cache_test.go @@ -73,7 +73,8 @@ func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunk.Chunk) { chunks = append(chunks, cleanChunk) } - cache.Store(context.Background(), keys, bufs) + err := cache.Store(context.Background(), keys, bufs) + require.NoError(t, err) return keys, chunks } diff --git a/pkg/storage/chunk/cache/fifo_cache_test.go b/pkg/storage/chunk/cache/fifo_cache_test.go index 50aee975a3c27..752c1a28cd4db 100644 --- a/pkg/storage/chunk/cache/fifo_cache_test.go +++ b/pkg/storage/chunk/cache/fifo_cache_test.go @@ -51,7 +51,8 @@ func TestFifoCacheEviction(t *testing.T) { keys = append(keys, key) values = append(values, value) } - c.Store(ctx, keys, values) + err := c.Store(ctx, keys, values) + require.NoError(t, err) require.Len(t, c.entries, cnt) assert.Equal(t, testutil.ToFloat64(c.entriesAdded), float64(1)) @@ -93,7 +94,8 @@ func TestFifoCacheEviction(t *testing.T) { keys = append(keys, key) values = append(values, value) } - c.Store(ctx, keys, values) + err = c.Store(ctx, keys, values) + require.NoError(t, err) require.Len(t, c.entries, cnt) assert.Equal(t, testutil.ToFloat64(c.entriesAdded), float64(2)) @@ -139,7 +141,8 @@ func TestFifoCacheEviction(t *testing.T) { copy(value, vstr) values = append(values, value) } - c.Store(ctx, keys, values) + err = c.Store(ctx, keys, values) + require.NoError(t, err) require.Len(t, c.entries, cnt) for i := cnt; i < cnt+evicted; i++ { diff --git a/pkg/storage/chunk/cache/instrumented.go b/pkg/storage/chunk/cache/instrumented.go index adc8768102766..7f404a32e0d85 100644 --- a/pkg/storage/chunk/cache/instrumented.go +++ b/pkg/storage/chunk/cache/instrumented.go @@ -83,7 +83,7 @@ func (i *instrumentedCache) Fetch(ctx context.Context, keys []string) ([]string, found []string bufs [][]byte missing []string - fetchErr error = nil + fetchErr error method = i.name + ".fetch" ) diff --git a/pkg/storage/chunk/cache/memcached.go b/pkg/storage/chunk/cache/memcached.go index 8417251e7e091..9f1a727b35e22 100644 --- a/pkg/storage/chunk/cache/memcached.go +++ b/pkg/storage/chunk/cache/memcached.go @@ -206,7 +206,7 @@ func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found // Store stores the key in the cache. func (c *Memcached) Store(ctx context.Context, keys []string, bufs [][]byte) error { - var err error = nil + var err error for i := range keys { cacheErr := instr.CollectedRequest(ctx, "Memcache.Put", c.requestDuration, memcacheStatusCode, func(_ context.Context) error { item := memcache.Item{ diff --git a/pkg/storage/chunk/cache/memcached_test.go b/pkg/storage/chunk/cache/memcached_test.go index d641225ff0e39..d16a906619111 100644 --- a/pkg/storage/chunk/cache/memcached_test.go +++ b/pkg/storage/chunk/cache/memcached_test.go @@ -52,7 +52,8 @@ func testMemcache(t *testing.T, memcache *cache.Memcached) { keys = append(keys, fmt.Sprint(i)) bufs = append(bufs, []byte(fmt.Sprint(i))) } - memcache.Store(ctx, keys, bufs) + err := memcache.Store(ctx, keys, bufs) + require.NoError(t, err) found, bufs, missing, _ := memcache.Fetch(ctx, keysIncMissing) for i := 0; i < numKeys; i++ { @@ -126,7 +127,8 @@ func testMemcacheFailing(t *testing.T, memcache *cache.Memcached) { keys = append(keys, fmt.Sprint(i)) bufs = append(bufs, []byte(fmt.Sprint(i))) } - memcache.Store(ctx, keys, bufs) + err := memcache.Store(ctx, keys, bufs) + require.NoError(t, err) for i := 0; i < 10; i++ { found, bufs, missing, _ := memcache.Fetch(ctx, keysIncMissing) diff --git a/pkg/storage/chunk/cache/tiered.go b/pkg/storage/chunk/cache/tiered.go index c0fa210ca7d05..ea8b7192108b0 100644 --- a/pkg/storage/chunk/cache/tiered.go +++ b/pkg/storage/chunk/cache/tiered.go @@ -20,7 +20,7 @@ func IsEmptyTieredCache(cache Cache) bool { } func (t tiered) Store(ctx context.Context, keys []string, bufs [][]byte) error { - var err error = nil + var err error for _, c := range []Cache(t) { cacheErr := c.Store(ctx, keys, bufs) if cacheErr != nil { @@ -34,7 +34,7 @@ func (t tiered) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, [ found := make(map[string][]byte, len(keys)) missing := keys previousCaches := make([]Cache, 0, len(t)) - var err error = nil + var err error for _, c := range []Cache(t) { var ( diff --git a/pkg/storage/chunk/cache/tiered_test.go b/pkg/storage/chunk/cache/tiered_test.go index 941e42ac2fe78..e024fe9ab096f 100644 --- a/pkg/storage/chunk/cache/tiered_test.go +++ b/pkg/storage/chunk/cache/tiered_test.go @@ -24,8 +24,10 @@ func TestTiered(t *testing.T) { level1, level2 := cache.NewMockCache(), cache.NewMockCache() cache := cache.NewTiered([]cache.Cache{level1, level2}) - level1.Store(context.Background(), []string{"key1"}, [][]byte{[]byte("hello")}) - level2.Store(context.Background(), []string{"key2"}, [][]byte{[]byte("world")}) + err := level1.Store(context.Background(), []string{"key1"}, [][]byte{[]byte("hello")}) + require.NoError(t, err) + err = level2.Store(context.Background(), []string{"key2"}, [][]byte{[]byte("world")}) + require.NoError(t, err) keys, bufs, missing, _ := cache.Fetch(context.Background(), []string{"key1", "key2", "key3"}) require.Equal(t, []string{"key1", "key2"}, keys) diff --git a/pkg/storage/chunk/chunk_store_utils.go b/pkg/storage/chunk/chunk_store_utils.go index 9dcfa1aebb1b1..b3c30d46cd140 100644 --- a/pkg/storage/chunk/chunk_store_utils.go +++ b/pkg/storage/chunk/chunk_store_utils.go @@ -192,7 +192,10 @@ func (c *Fetcher) writeBackCache(ctx context.Context, chunks []Chunk) error { bufs = append(bufs, encoded) } - c.cache.Store(ctx, keys, bufs) + err := c.cache.Store(ctx, keys, bufs) + if err != nil { + level.Warn(util_log.Logger).Log("msg", "writeBackCache cache store fail", "err", err) + } return nil } diff --git a/pkg/storage/chunk/series_store.go b/pkg/storage/chunk/series_store.go index fc1f7de6d5f3b..f515bafc9b5fa 100644 --- a/pkg/storage/chunk/series_store.go +++ b/pkg/storage/chunk/series_store.go @@ -473,7 +473,10 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun } bufs := make([][]byte, len(keysToCache)) - c.writeDedupeCache.Store(ctx, keysToCache, bufs) + err = c.writeDedupeCache.Store(ctx, keysToCache, bufs) + if err != nil { + level.Warn(log).Log("msg", "could not Store store in write dedupe cache", "err", err) + } return nil } diff --git a/pkg/storage/chunk/storage/caching_index_client.go b/pkg/storage/chunk/storage/caching_index_client.go index 0533877f4ded7..deb15e5f5c10f 100644 --- a/pkg/storage/chunk/storage/caching_index_client.go +++ b/pkg/storage/chunk/storage/caching_index_client.go @@ -313,7 +313,7 @@ func (s *cachingIndexClient) cacheFetch(ctx context.Context, keys []string) (bat // Look up the hashes in a single batch. If we get an error, we just "miss" all // of the keys. Eventually I want to push all the errors to the leafs of the cache // tree, to the caches only return found & missed. - foundHashes, bufs, _ := s.cache.Fetch(ctx, hashes) + foundHashes, bufs, _, _ := s.cache.Fetch(ctx, hashes) // Reverse the hash, unmarshal the index entries, check we got what we expected // and that its still valid. From bb7e447d465f15619cbad53444034afb24b58631 Mon Sep 17 00:00:00 2001 From: fuling Date: Wed, 8 Dec 2021 19:01:38 +0800 Subject: [PATCH 04/10] =?UTF-8?q?[optimization]=20cache=20prometheus=20:?= =?UTF-8?q?=20fix=20"loki=5Fcache=5Frequest=5Fduration=5Fseconds=5Fbucket"?= =?UTF-8?q?=20=E2=80=98status=5Fcode=E2=80=99=20label=20always=20equals=20?= =?UTF-8?q?"200"=20#4891?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/storage/chunk/storage/caching_index_client.go | 6 +++--- pkg/storage/chunk/storage/caching_index_client_test.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/storage/chunk/storage/caching_index_client.go b/pkg/storage/chunk/storage/caching_index_client.go index deb15e5f5c10f..2904d0aaecb28 100644 --- a/pkg/storage/chunk/storage/caching_index_client.go +++ b/pkg/storage/chunk/storage/caching_index_client.go @@ -270,7 +270,7 @@ func isChunksQuery(q chunk.IndexQuery) bool { return len(q.RangeValueStart) != 0 } -func (s *cachingIndexClient) cacheStore(ctx context.Context, keys []string, batches []ReadBatch) { +func (s *cachingIndexClient) cacheStore(ctx context.Context, keys []string, batches []ReadBatch) error { cachePuts.Add(float64(len(keys))) // We're doing the hashing to handle unicode and key len properly. @@ -283,12 +283,12 @@ func (s *cachingIndexClient) cacheStore(ctx context.Context, keys []string, batc if err != nil { level.Warn(s.logger).Log("msg", "error marshalling ReadBatch", "err", err) cacheEncodeErrs.Inc() - return + return err } bufs = append(bufs, out) } - s.cache.Store(ctx, hashed, bufs) + return s.cache.Store(ctx, hashed, bufs) } func (s *cachingIndexClient) cacheFetch(ctx context.Context, keys []string) (batches []ReadBatch, missed []string) { diff --git a/pkg/storage/chunk/storage/caching_index_client_test.go b/pkg/storage/chunk/storage/caching_index_client_test.go index 727373fe51c75..eda32ac81725e 100644 --- a/pkg/storage/chunk/storage/caching_index_client_test.go +++ b/pkg/storage/chunk/storage/caching_index_client_test.go @@ -270,9 +270,9 @@ type mockCache struct { cache.Cache } -func (m *mockCache) Store(ctx context.Context, keys []string, buf [][]byte) { +func (m *mockCache) Store(ctx context.Context, keys []string, buf [][]byte) error { m.storedKeys = append(m.storedKeys, keys...) - m.Cache.Store(ctx, keys, buf) + return m.Cache.Store(ctx, keys, buf) } func buildQueryKey(q chunk.IndexQuery) string { From 2ae9b5b2b7f4f1c4074222cfa94529a6e6e5daab Mon Sep 17 00:00:00 2001 From: fuling Date: Wed, 8 Dec 2021 22:54:34 +0800 Subject: [PATCH 05/10] =?UTF-8?q?[optimization]=20cache=20prometheus=20:?= =?UTF-8?q?=20fix=20"loki=5Fcache=5Frequest=5Fduration=5Fseconds=5Fbucket"?= =?UTF-8?q?=20=E2=80=98status=5Fcode=E2=80=99=20label=20always=20equals=20?= =?UTF-8?q?"200"=20#4891?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/storage/chunk/cache/background.go | 5 ++++- pkg/storage/chunk/cache/fifo_cache_test.go | 3 ++- pkg/storage/chunk/cache/instrumented.go | 2 +- pkg/storage/chunk/cache/redis_cache_test.go | 3 ++- pkg/storage/chunk/storage/caching_index_client.go | 8 ++++++-- 5 files changed, 15 insertions(+), 6 deletions(-) diff --git a/pkg/storage/chunk/cache/background.go b/pkg/storage/chunk/cache/background.go index b131e96bc277a..edecfdec8f1f5 100644 --- a/pkg/storage/chunk/cache/background.go +++ b/pkg/storage/chunk/cache/background.go @@ -119,7 +119,10 @@ func (c *backgroundCache) writeBackLoop() { return } c.queueLength.Sub(float64(len(bgWrite.keys))) - c.Cache.Store(context.Background(), bgWrite.keys, bgWrite.bufs) + err := c.Cache.Store(context.Background(), bgWrite.keys, bgWrite.bufs) + if err != nil { + continue + } case <-c.quit: return diff --git a/pkg/storage/chunk/cache/fifo_cache_test.go b/pkg/storage/chunk/cache/fifo_cache_test.go index 752c1a28cd4db..ab2b5e3e36d6b 100644 --- a/pkg/storage/chunk/cache/fifo_cache_test.go +++ b/pkg/storage/chunk/cache/fifo_cache_test.go @@ -192,9 +192,10 @@ func TestFifoCacheExpiry(t *testing.T) { c := NewFifoCache(test.name, test.cfg, nil, log.NewNopLogger()) ctx := context.Background() - c.Store(ctx, + err := c.Store(ctx, []string{key1, key2, key4, key3, key2, key1}, [][]byte{genBytes(16), []byte("dummy"), genBytes(20), data3, data2, data1}) + require.NoError(t, err) value, ok := c.Get(ctx, key1) require.True(t, ok) diff --git a/pkg/storage/chunk/cache/instrumented.go b/pkg/storage/chunk/cache/instrumented.go index 7f404a32e0d85..c1c9c66195f17 100644 --- a/pkg/storage/chunk/cache/instrumented.go +++ b/pkg/storage/chunk/cache/instrumented.go @@ -84,7 +84,7 @@ func (i *instrumentedCache) Fetch(ctx context.Context, keys []string) ([]string, bufs [][]byte missing []string fetchErr error - method = i.name + ".fetch" + method = i.name + ".fetch" ) err := instr.CollectedRequest(ctx, method, i.requestDuration, instr.ErrorCode, func(ctx context.Context) error { diff --git a/pkg/storage/chunk/cache/redis_cache_test.go b/pkg/storage/chunk/cache/redis_cache_test.go index 59d80062cbf92..3048473dba911 100644 --- a/pkg/storage/chunk/cache/redis_cache_test.go +++ b/pkg/storage/chunk/cache/redis_cache_test.go @@ -28,7 +28,8 @@ func TestRedisCache(t *testing.T) { ctx := context.Background() - c.Store(ctx, keys, bufs) + err = c.Store(ctx, keys, bufs) + require.NoError(t, err) // test hits found, data, missed, _ := c.Fetch(ctx, keys) diff --git a/pkg/storage/chunk/storage/caching_index_client.go b/pkg/storage/chunk/storage/caching_index_client.go index 2904d0aaecb28..e6089b0747770 100644 --- a/pkg/storage/chunk/storage/caching_index_client.go +++ b/pkg/storage/chunk/storage/caching_index_client.go @@ -199,8 +199,12 @@ func (s *cachingIndexClient) queryPages(ctx context.Context, queries []chunk.Ind callback(query, batch) } } - s.cacheStore(ctx, keys, batches) - return cardinalityErr + + err := s.cacheStore(ctx, keys, batches) + if cardinalityErr != nil { + return cardinalityErr + } + return err } } From 6c888fd59ae9e6890fba19acb0fcd51af04281eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=9B=BD=E5=BF=A0?= <249032432@qq.com> Date: Tue, 11 Jan 2022 12:43:07 +0800 Subject: [PATCH 06/10] Update pkg/storage/chunk/cache/instrumented.go Co-authored-by: Karsten Jeschkies --- pkg/storage/chunk/cache/instrumented.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/storage/chunk/cache/instrumented.go b/pkg/storage/chunk/cache/instrumented.go index c1c9c66195f17..fb1f79c135b68 100644 --- a/pkg/storage/chunk/cache/instrumented.go +++ b/pkg/storage/chunk/cache/instrumented.go @@ -73,8 +73,7 @@ func (i *instrumentedCache) Store(ctx context.Context, keys []string, bufs [][]b return instr.CollectedRequest(ctx, method, i.requestDuration, instr.ErrorCode, func(ctx context.Context) error { sp := ot.SpanFromContext(ctx) sp.LogFields(otlog.Int("keys", len(keys))) - err := i.Cache.Store(ctx, keys, bufs) - return err + return i.Cache.Store(ctx, keys, bufs) }) } From e522e8d8d43c72d2e533b99b782f897fb9e3e529 Mon Sep 17 00:00:00 2001 From: fuling Date: Tue, 11 Jan 2022 12:48:59 +0800 Subject: [PATCH 07/10] =?UTF-8?q?[optimization]=20cache=20prometheus=20:?= =?UTF-8?q?=20fix=20"loki=5Fcache=5Frequest=5Fduration=5Fseconds=5Fbucket"?= =?UTF-8?q?=20=E2=80=98status=5Fcode=E2=80=99=20label=20always=20equals=20?= =?UTF-8?q?"200"=20#4891?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/storage/chunk/cache/background.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/storage/chunk/cache/background.go b/pkg/storage/chunk/cache/background.go index edecfdec8f1f5..10cf8048b949e 100644 --- a/pkg/storage/chunk/cache/background.go +++ b/pkg/storage/chunk/cache/background.go @@ -5,6 +5,8 @@ import ( "flag" "sync" + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/go-kit/log/level" opentracing "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/prometheus" @@ -121,6 +123,7 @@ func (c *backgroundCache) writeBackLoop() { c.queueLength.Sub(float64(len(bgWrite.keys))) err := c.Cache.Store(context.Background(), bgWrite.keys, bgWrite.bufs) if err != nil { + level.Warn(util_log.Logger).Log("msg", "backgroundCache writeBackLoop Cache.Store fail", "err", err) continue } From f5177abebb251543357aef579691eae25353961b Mon Sep 17 00:00:00 2001 From: fuling Date: Fri, 14 Jan 2022 17:32:23 +0800 Subject: [PATCH 08/10] =?UTF-8?q?[optimization]=20cache=20prometheus=20:?= =?UTF-8?q?=20fix=20"loki=5Fcache=5Frequest=5Fduration=5Fseconds=5Fbucket"?= =?UTF-8?q?=20=E2=80=98status=5Fcode=E2=80=99=20label=20always=20equals=20?= =?UTF-8?q?"200"=20#4891?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/querier/queryrange/queryrangebase/results_cache.go | 4 ++-- pkg/storage/chunk/cache/cache.go | 7 ------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/pkg/querier/queryrange/queryrangebase/results_cache.go b/pkg/querier/queryrange/queryrangebase/results_cache.go index d04639596b4a8..18b396afb2eae 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache.go @@ -559,7 +559,7 @@ func (s resultsCache) filterRecentExtents(req Request, maxCacheFreshness time.Du } func (s resultsCache) get(ctx context.Context, key string) ([]Extent, bool) { - found, bufs, _ , _:= s.cache.Fetch(ctx, []string{cache.HashKey(key)}) + found, bufs, _, _ := s.cache.Fetch(ctx, []string{cache.HashKey(key)}) if len(found) != 1 { return nil, false } @@ -600,7 +600,7 @@ func (s resultsCache) put(ctx context.Context, key string, extents []Extent) { return } - s.cache.Store(ctx, []string{cache.HashKey(key)}, [][]byte{buf}) + _ = s.cache.Store(ctx, []string{cache.HashKey(key)}, [][]byte{buf}) } func jaegerTraceID(ctx context.Context) string { diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index b930ff0e662b4..f6a3e7b9a9f65 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -24,13 +24,6 @@ type Cache interface { Stop() } -type Cache2 interface { - Store(ctx context.Context, key []string, buf [][]byte) error - Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) - Stop() -} - - // Config for building Caches. type Config struct { EnableFifoCache bool `yaml:"enable_fifocache"` From 92efa3e4a5e6e7ee98b0227c93cc828638b18eb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=9B=BD=E5=BF=A0?= <249032432@qq.com> Date: Fri, 14 Jan 2022 23:47:06 +0800 Subject: [PATCH 09/10] Update pkg/querier/queryrange/roundtrip.go Co-authored-by: Owen Diehl --- pkg/querier/queryrange/roundtrip.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 9b64030769203..2b5f96b58652c 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -18,7 +18,7 @@ import ( "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/chunk" - lokicache "github.com/grafana/loki/pkg/storage/chunk/cache" + "github.com/grafana/loki/pkg/storage/chunk/cache" ) // Config is the configuration for the queryrange tripperware From 280278c05db7ae46056bf747e5332fb9d1e519c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=9B=BD=E5=BF=A0?= <249032432@qq.com> Date: Sat, 15 Jan 2022 00:01:41 +0800 Subject: [PATCH 10/10] Update roundtrip.go --- pkg/querier/queryrange/roundtrip.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 2b5f96b58652c..167ce482665de 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -392,7 +392,7 @@ func NewMetricTripperware( SplitByIntervalMiddleware(limits, codec, splitMetricByTime, splitByMetrics), ) - var c lokicache.Cache + var c cache.Cache if cfg.CacheResults { queryCacheMiddleware, cache, err := queryrangebase.NewResultsCacheMiddleware( log,