diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 7ae86fc3de..efafa93b09 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -228,12 +228,44 @@ func (c *store) LabelValuesForMetricName(ctx context.Context, from, through mode } result = append(result, string(labelValue)) } - sort.Strings(result) result = uniqueStrings(result) return result, nil } +// LabelNamesForMetricName retrieves all label names for a metric name. +func (c *store) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) { + log, ctx := spanlogger.New(ctx, "ChunkStore.LabelNamesForMetricName") + defer log.Span.Finish() + level.Debug(log).Log("from", from, "through", through, "metricName", metricName) + + shortcut, err := c.validateQueryTimeRange(ctx, &from, &through) + if err != nil { + return nil, err + } else if shortcut { + return nil, nil + } + + chunks, err := c.lookupChunksByMetricName(ctx, from, through, nil, metricName) + if err != nil { + return nil, err + } + level.Debug(log).Log("msg", "Chunks in index", "chunks", len(chunks)) + + // Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint + filtered := filterChunksByTime(from, through, chunks) + filtered, keys := filterChunksByUniqueFingerprint(filtered) + level.Debug(log).Log("msg", "Chunks post filtering", "chunks", len(chunks)) + + // Now fetch the actual chunk data from Memcache / S3 + allChunks, err := c.FetchChunks(ctx, filtered, keys) + if err != nil { + level.Error(log).Log("msg", "FetchChunks", "err", err) + return nil, err + } + return labelNamesFromChunks(allChunks), nil +} + func (c *store) validateQueryTimeRange(ctx context.Context, from *model.Time, through *model.Time) (bool, error) { log, ctx := spanlogger.New(ctx, "store.validateQueryTimeRange") defer log.Span.Finish() diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index bd2ea91839..9e91c60ca8 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -370,6 +370,108 @@ func TestChunkStore_LabelValuesForMetricName(t *testing.T) { } +func TestChunkStore_LabelNamesForMetricName(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), userID) + now := model.Now() + + fooMetric1 := labels.Labels{ + {Name: labels.MetricName, Value: "foo"}, + {Name: "bar", Value: "baz"}, + {Name: "flip", Value: "flop"}, + {Name: "toms", Value: "code"}, + } + fooMetric2 := labels.Labels{ + {Name: labels.MetricName, Value: "foo"}, + {Name: "bar", Value: "beep"}, + {Name: "toms", Value: "code"}, + } + fooMetric3 := labels.Labels{ + {Name: labels.MetricName, Value: "foo"}, + {Name: "bar", Value: "bop"}, + {Name: "flip", Value: "flap"}, + } + + // barMetric1 is a subset of barMetric2 to test over-matching bug. + barMetric1 := labels.Labels{ + {Name: labels.MetricName, Value: "bar"}, + {Name: "bar", Value: "baz"}, + } + barMetric2 := labels.Labels{ + {Name: labels.MetricName, Value: "bar"}, + {Name: "bar", Value: "baz"}, + {Name: "toms", Value: "code"}, + } + + fooChunk1 := dummyChunkFor(now, fooMetric1) + fooChunk2 := dummyChunkFor(now, fooMetric2) + fooChunk3 := dummyChunkFor(now, fooMetric3) + fooChunk4 := dummyChunkFor(now.Add(-time.Hour), fooMetric1) // same series but different chunk + + barChunk1 := dummyChunkFor(now, barMetric1) + barChunk2 := dummyChunkFor(now, barMetric2) + + for _, tc := range []struct { + metricName string + expect []string + }{ + { + `foo`, + []string{"bar", "flip", "toms"}, + }, + { + `bar`, + []string{"bar", "toms"}, + }, + } { + for _, schema := range schemas { + for _, storeCase := range stores { + t.Run(fmt.Sprintf("%s / %s / %s ", tc.metricName, schema.name, storeCase.name), func(t *testing.T) { + t.Log("========= Running labelNames with metricName", tc.metricName, "with schema", schema.name) + storeCfg := storeCase.configFn() + store := newTestChunkStoreConfig(t, schema.name, storeCfg) + defer store.Stop() + + if err := store.Put(ctx, []Chunk{ + fooChunk1, + fooChunk2, + fooChunk3, + fooChunk4, + barChunk1, + barChunk2, + }); err != nil { + t.Fatal(err) + } + + // Query with ordinary time-range + labelNames1, err := store.LabelNamesForMetricName(ctx, now.Add(-time.Hour), now, tc.metricName) + require.NoError(t, err) + + if !reflect.DeepEqual(tc.expect, labelNames1) { + t.Fatalf("%s: wrong label name - %s", tc.metricName, test.Diff(tc.expect, labelNames1)) + } + + // Pushing end of time-range into future should yield exact same resultset + labelNames2, err := store.LabelNamesForMetricName(ctx, now.Add(-time.Hour), now.Add(time.Hour*24*10), tc.metricName) + require.NoError(t, err) + + if !reflect.DeepEqual(tc.expect, labelNames2) { + t.Fatalf("%s: wrong label name - %s", tc.metricName, test.Diff(tc.expect, labelNames2)) + } + + // Query with both begin & end of time-range in future should yield empty resultset + labelNames3, err := store.LabelNamesForMetricName(ctx, now.Add(time.Hour), now.Add(time.Hour*2), tc.metricName) + require.NoError(t, err) + if len(labelNames3) != 0 { + t.Fatalf("%s: future query should yield empty resultset ... actually got %v label names: %#v", + tc.metricName, len(labelNames3), labelNames3) + } + }) + } + } + } + +} + // TestChunkStore_getMetricNameChunks tests if chunks are fetched correctly when we have the metric name func TestChunkStore_getMetricNameChunks(t *testing.T) { ctx := user.InjectOrgID(context.Background(), userID) diff --git a/pkg/chunk/chunk_store_utils.go b/pkg/chunk/chunk_store_utils.go index c5e7cade69..19d1fc499c 100644 --- a/pkg/chunk/chunk_store_utils.go +++ b/pkg/chunk/chunk_store_utils.go @@ -2,6 +2,7 @@ package chunk import ( "context" + "sort" "sync" "github.com/go-kit/kit/log/level" @@ -36,6 +37,39 @@ func keysFromChunks(chunks []Chunk) []string { return keys } +func labelNamesFromChunks(chunks []Chunk) []string { + keys := map[string]struct{}{} + var result []string + for _, c := range chunks { + for _, l := range c.Metric { + if l.Name != model.MetricNameLabel { + if _, ok := keys[string(l.Name)]; !ok { + keys[string(l.Name)] = struct{}{} + result = append(result, string(l.Name)) + } + } + } + } + sort.Strings(result) + return result +} + +func filterChunksByUniqueFingerprint(chunks []Chunk) ([]Chunk, []string) { + filtered := make([]Chunk, 0, len(chunks)) + keys := make([]string, 0, len(chunks)) + uniqueFp := map[model.Fingerprint]struct{}{} + + for _, chunk := range chunks { + if _, ok := uniqueFp[chunk.Fingerprint]; ok { + continue + } + filtered = append(filtered, chunk) + keys = append(keys, chunk.ExternalKey()) + uniqueFp[chunk.Fingerprint] = struct{}{} + } + return filtered, keys +} + func filterChunksByMatchers(chunks []Chunk, filters []*labels.Matcher) []Chunk { filteredChunks := make([]Chunk, 0, len(chunks)) outer: diff --git a/pkg/chunk/composite_store.go b/pkg/chunk/composite_store.go index 762aa424e3..c00c3da8db 100644 --- a/pkg/chunk/composite_store.go +++ b/pkg/chunk/composite_store.go @@ -19,6 +19,7 @@ type Store interface { // using the corresponding Fetcher (fetchers[i].FetchChunks(ctx, chunks[i], ...) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error) + LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) Stop() } @@ -106,6 +107,20 @@ func (c compositeStore) LabelValuesForMetricName(ctx context.Context, from, thro return result, err } +// LabelNamesForMetricName retrieves all label names for a metric name. +func (c compositeStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) { + var result []string + err := c.forStores(from, through, func(from, through model.Time, store Store) error { + labelNames, err := store.LabelNamesForMetricName(ctx, from, through, metricName) + if err != nil { + return err + } + result = append(result, labelNames...) + return nil + }) + return result, err +} + func (c compositeStore) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) { chunkIDs := [][]Chunk{} fetchers := []*Fetcher{} diff --git a/pkg/chunk/composite_store_test.go b/pkg/chunk/composite_store_test.go index 7afd5fd85d..4f0bd50736 100644 --- a/pkg/chunk/composite_store_test.go +++ b/pkg/chunk/composite_store_test.go @@ -32,6 +32,10 @@ func (m mockStore) GetChunkRefs(tx context.Context, from, through model.Time, ma return nil, nil, nil } +func (m mockStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) { + return nil, nil +} + func (m mockStore) Stop() {} func TestCompositeStore(t *testing.T) { diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index af7343bbfd..688f13c023 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -191,6 +191,61 @@ func (c *seriesStore) GetChunkRefs(ctx context.Context, from, through model.Time return [][]Chunk{chunks}, []*Fetcher{c.store.Fetcher}, nil } +// LabelNamesForMetricName retrieves all label names for a metric name. +func (c *seriesStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) { + log, ctx := spanlogger.New(ctx, "SeriesStore.LabelNamesForMetricName") + defer log.Span.Finish() + + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + + shortcut, err := c.validateQueryTimeRange(ctx, &from, &through) + if err != nil { + return nil, err + } else if shortcut { + return nil, nil + } + level.Debug(log).Log("metric", metricName) + + // Fetch the series IDs from the index + seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, userID, metricName, nil) + if err != nil { + return nil, err + } + level.Debug(log).Log("series-ids", len(seriesIDs)) + + // Lookup the series in the index to get the chunks. + chunkIDs, err := c.lookupChunksBySeries(ctx, from, through, userID, seriesIDs) + if err != nil { + level.Error(log).Log("msg", "lookupChunksBySeries", "err", err) + return nil, err + } + level.Debug(log).Log("chunk-ids", len(chunkIDs)) + + chunks, err := c.convertChunkIDsToChunks(ctx, userID, chunkIDs) + if err != nil { + level.Error(log).Log("err", "convertChunkIDsToChunks", "err", err) + return nil, err + } + + // Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint + filtered := filterChunksByTime(from, through, chunks) + filtered, keys := filterChunksByUniqueFingerprint(filtered) + level.Debug(log).Log("Chunks post filtering", len(chunks)) + + chunksPerQuery.Observe(float64(len(filtered))) + + // Now fetch the actual chunk data from Memcache / S3 + allChunks, err := c.FetchChunks(ctx, filtered, keys) + if err != nil { + level.Error(log).Log("msg", "FetchChunks", "err", err) + return nil, err + } + return labelNamesFromChunks(allChunks), nil +} + func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, userID, metricName string, matchers []*labels.Matcher) ([]string, error) { log, ctx := spanlogger.New(ctx, "SeriesStore.lookupSeriesByMetricNameMatchers", "metricName", metricName, "matchers", len(matchers)) defer log.Span.Finish()