Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve series store index queries #6045

Merged
merged 1 commit into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 2 additions & 13 deletions pkg/storage/stores/series/index/caching_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/grafana/loki/pkg/storage/chunk/cache"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/spanlogger"
)

var (
Expand Down Expand Up @@ -322,10 +321,6 @@ func (s *cachingIndexClient) cacheStore(ctx context.Context, keys []string, batc
}

func (s *cachingIndexClient) cacheFetch(ctx context.Context, keys []string) (batches []ReadBatch, missed []string) {
spanLogger := spanlogger.FromContext(ctx)
logger := util_log.WithContext(ctx, s.logger)
level.Debug(spanLogger).Log("requested", len(keys))

cacheGets.Add(float64(len(keys)))

// Build a map from hash -> key; NB there can be collisions here; we'll fetch
Expand Down Expand Up @@ -354,27 +349,22 @@ func (s *cachingIndexClient) cacheFetch(ctx context.Context, keys []string) (bat
var readBatch ReadBatch

if err := proto.Unmarshal(bufs[j], &readBatch); err != nil {
level.Warn(spanLogger).Log("msg", "error unmarshalling index entry from cache", "err", err)
level.Warn(util_log.Logger).Log("msg", "error unmarshalling index entry from cache", "err", err)
cacheCorruptErrs.Inc()
continue
}

// Make sure the hash(key) is not a collision in the cache by looking at the
// key in the value.
if key != readBatch.Key {
level.Debug(spanLogger).Log("msg", "dropping index cache entry due to key collision", "key", key, "readBatch.Key", readBatch.Key, "expiry")
level.Debug(util_log.Logger).Log("msg", "dropping index cache entry due to key collision", "key", key, "readBatch.Key", readBatch.Key, "expiry")
continue
}

if readBatch.Expiry != 0 && time.Now().After(time.Unix(0, readBatch.Expiry)) {
continue
}

if len(readBatch.Entries) != 0 {
// not using spanLogger to avoid over-inflating traces since the query count can go much higher
level.Debug(logger).Log("msg", "found index cache entries", "key", key, "count", len(readBatch.Entries))
}

cacheHits.Inc()
batches = append(batches, readBatch)
}
Expand All @@ -392,6 +382,5 @@ func (s *cachingIndexClient) cacheFetch(ctx context.Context, keys []string) (bat
missed = append(missed, miss)
}

level.Debug(spanLogger).Log("hits", len(batches), "misses", len(misses))
return batches, missed
}
22 changes: 11 additions & 11 deletions pkg/storage/stores/series/index/schema_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ func ParseChunkTimeRangeValue(rangeValue []byte, value []byte) (
// v1 & v2 schema had three components - label name, label value and chunk ID.
// No version number.
case len(components) == 3:
chunkID = string(components[2])
labelValue = model.LabelValue(components[1])
chunkID = yoloString(components[2])
labelValue = model.LabelValue(yoloString(components[1]))
return

case len(components[3]) == 1:
Expand All @@ -205,42 +205,42 @@ func ParseChunkTimeRangeValue(rangeValue []byte, value []byte) (
// "version" is 1 and label value is base64 encoded.
// (older code wrote "version" as 1, not '1')
case chunkTimeRangeKeyV1a, chunkTimeRangeKeyV1:
chunkID = string(components[2])
chunkID = yoloString(components[2])
labelValue, err = decodeBase64Value(components[1])
return

// v4 schema wrote v3 range keys and a new range key - version 2,
// with four components - <empty>, <empty>, chunk ID and version.
case chunkTimeRangeKeyV2:
chunkID = string(components[2])
chunkID = yoloString(components[2])
return

// v5 schema version 3 range key is chunk end time, <empty>, chunk ID, version
case chunkTimeRangeKeyV3:
chunkID = string(components[2])
chunkID = yoloString(components[2])
return

// v5 schema version 4 range key is chunk end time, label value, chunk ID, version
case chunkTimeRangeKeyV4:
chunkID = string(components[2])
chunkID = yoloString(components[2])
labelValue, err = decodeBase64Value(components[1])
return

// v6 schema added version 5 range keys, which have the label value written in
// to the value, not the range key. So they are [chunk end time, <empty>, chunk ID, version].
case chunkTimeRangeKeyV5:
chunkID = string(components[2])
labelValue = model.LabelValue(value)
chunkID = yoloString(components[2])
labelValue = model.LabelValue(yoloString(value))
return

// v9 schema actually return series IDs
case seriesRangeKeyV1:
chunkID = string(components[0])
chunkID = yoloString(components[0])
return

case labelSeriesRangeKeyV1:
chunkID = string(components[1])
labelValue = model.LabelValue(value)
chunkID = yoloString(components[1])
labelValue = model.LabelValue(yoloString(value))
return
}
}
Expand Down
34 changes: 18 additions & 16 deletions pkg/storage/stores/series/series_index_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ func (c *IndexStore) LabelValuesForMetricName(ctx context.Context, userID string
if err != nil {
return nil, err
}
// nolint:staticcheck
defer entriesPool.Put(entries)

var result util.UniqueStrings
for _, entry := range entries {
Expand Down Expand Up @@ -289,6 +291,8 @@ func (c *IndexStore) labelValuesForMetricNameWithMatchers(ctx context.Context, u
if err != nil {
return nil, err
}
// nolint:staticcheck
defer entriesPool.Put(entries)

result := util.NewUniqueStrings(len(entries))
for _, entry := range entries {
Expand Down Expand Up @@ -409,7 +413,6 @@ func (c *IndexStore) lookupIdsByMetricNameMatcher(ctx context.Context, from, thr
if err != nil {
return nil, err
}
unfilteredQueries := len(queries)

if filter != nil {
queries = filter(queries)
Expand All @@ -423,20 +426,13 @@ func (c *IndexStore) lookupIdsByMetricNameMatcher(ctx context.Context, from, thr
} else if err != nil {
return nil, err
}
// nolint:staticcheck
defer entriesPool.Put(entries)

ids, err := parseIndexEntries(ctx, entries, matcher)
if err != nil {
return nil, err
}
level.Debug(util_log.WithContext(ctx, util_log.Logger)).
Log(
"msg", "Store.lookupIdsByMetricNameMatcher",
"matcher", formatMatcher(matcher),
"queries", unfilteredQueries,
"filteredQueries", len(queries),
"entries", len(entries),
"ids", len(ids),
)

return ids, nil
}
Expand Down Expand Up @@ -486,14 +482,20 @@ func parseIndexEntries(_ context.Context, entries []index.Entry, matcher *labels
return result, nil
}

var entriesPool = sync.Pool{
New: func() interface{} {
return make([]index.Entry, 0, 1024)
},
}

func (c *IndexStore) lookupEntriesByQueries(ctx context.Context, queries []index.Query) ([]index.Entry, error) {
// Nothing to do if there are no queries.
if len(queries) == 0 {
return nil, nil
}

var lock sync.Mutex
var entries []index.Entry
entries := entriesPool.Get().([]index.Entry)[:0]
err := c.index.QueryPages(ctx, queries, func(query index.Query, resp index.ReadBatchResult) bool {
iter := resp.Iterator()
lock.Lock()
Expand Down Expand Up @@ -532,6 +534,9 @@ func (c *IndexStore) lookupLabelNamesBySeries(ctx context.Context, from, through
if err != nil {
return nil, err
}
// nolint:staticcheck
defer entriesPool.Put(entries)

level.Debug(log).Log("entries", len(entries))

var result util.UniqueStrings
Expand Down Expand Up @@ -595,11 +600,8 @@ func (c *IndexStore) lookupChunksBySeries(ctx context.Context, from, through mod
if err != nil {
return nil, err
}
level.Debug(util_log.WithContext(ctx, util_log.Logger)).Log(
"msg", "SeriesStore.lookupChunksBySeries",
"seriesIDs", len(seriesIDs),
"queries", len(queries),
"entries", len(entries))
// nolint:staticcheck
defer entriesPool.Put(entries)

result, err := parseIndexEntries(ctx, entries, nil)
return result, err
Expand Down
11 changes: 0 additions & 11 deletions pkg/storage/stores/series/series_store_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"unicode/utf8"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk"
Expand Down Expand Up @@ -171,13 +170,3 @@ func FindSetMatches(pattern string) []string {
}
return matches
}

// Using this function avoids logging of nil matcher, which works, but indirectly via panic and recover.
// That confuses attached debugger, which wants to breakpoint on each panic.
// Using simple check is also faster.
func formatMatcher(matcher *labels.Matcher) string {
if matcher == nil {
return "nil"
}
return matcher.String()
}