Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LabelNamesForMetricName for the series store #1346

Merged
merged 4 commits into from
Jul 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,44 @@ func (c *store) LabelValuesForMetricName(ctx context.Context, from, through mode
}
result = append(result, string(labelValue))
}

sort.Strings(result)
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
result = uniqueStrings(result)
return result, nil
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *store) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
log, ctx := spanlogger.New(ctx, "ChunkStore.LabelNamesForMetricName")
defer log.Span.Finish()
level.Debug(log).Log("from", from, "through", through, "metricName", metricName)

shortcut, err := c.validateQueryTimeRange(ctx, &from, &through)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}

chunks, err := c.lookupChunksByMetricName(ctx, from, through, nil, metricName)
if err != nil {
return nil, err
}
level.Debug(log).Log("msg", "Chunks in index", "chunks", len(chunks))

// Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint
filtered := filterChunksByTime(from, through, chunks)
filtered, keys := filterChunksByUniqueFingerprint(filtered)
level.Debug(log).Log("msg", "Chunks post filtering", "chunks", len(chunks))

// Now fetch the actual chunk data from Memcache / S3
allChunks, err := c.FetchChunks(ctx, filtered, keys)
if err != nil {
level.Error(log).Log("msg", "FetchChunks", "err", err)
return nil, err
}
return labelNamesFromChunks(allChunks), nil
}

func (c *store) validateQueryTimeRange(ctx context.Context, from *model.Time, through *model.Time) (bool, error) {
log, ctx := spanlogger.New(ctx, "store.validateQueryTimeRange")
defer log.Span.Finish()
Expand Down
102 changes: 102 additions & 0 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,108 @@ func TestChunkStore_LabelValuesForMetricName(t *testing.T) {

}

func TestChunkStore_LabelNamesForMetricName(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), userID)
now := model.Now()

fooMetric1 := labels.Labels{
{Name: labels.MetricName, Value: "foo"},
{Name: "bar", Value: "baz"},
{Name: "flip", Value: "flop"},
{Name: "toms", Value: "code"},
}
fooMetric2 := labels.Labels{
{Name: labels.MetricName, Value: "foo"},
{Name: "bar", Value: "beep"},
{Name: "toms", Value: "code"},
}
fooMetric3 := labels.Labels{
{Name: labels.MetricName, Value: "foo"},
{Name: "bar", Value: "bop"},
{Name: "flip", Value: "flap"},
}

// barMetric1 is a subset of barMetric2 to test over-matching bug.
barMetric1 := labels.Labels{
{Name: labels.MetricName, Value: "bar"},
{Name: "bar", Value: "baz"},
}
barMetric2 := labels.Labels{
{Name: labels.MetricName, Value: "bar"},
{Name: "bar", Value: "baz"},
{Name: "toms", Value: "code"},
}

fooChunk1 := dummyChunkFor(now, fooMetric1)
fooChunk2 := dummyChunkFor(now, fooMetric2)
fooChunk3 := dummyChunkFor(now, fooMetric3)
fooChunk4 := dummyChunkFor(now.Add(-time.Hour), fooMetric1) // same series but different chunk

barChunk1 := dummyChunkFor(now, barMetric1)
barChunk2 := dummyChunkFor(now, barMetric2)

for _, tc := range []struct {
metricName string
expect []string
}{
{
`foo`,
[]string{"bar", "flip", "toms"},
},
{
`bar`,
[]string{"bar", "toms"},
},
} {
for _, schema := range schemas {
for _, storeCase := range stores {
t.Run(fmt.Sprintf("%s / %s / %s ", tc.metricName, schema.name, storeCase.name), func(t *testing.T) {
t.Log("========= Running labelNames with metricName", tc.metricName, "with schema", schema.name)
storeCfg := storeCase.configFn()
store := newTestChunkStoreConfig(t, schema.name, storeCfg)
defer store.Stop()

if err := store.Put(ctx, []Chunk{
fooChunk1,
fooChunk2,
fooChunk3,
fooChunk4,
barChunk1,
barChunk2,
}); err != nil {
t.Fatal(err)
}

// Query with ordinary time-range
labelNames1, err := store.LabelNamesForMetricName(ctx, now.Add(-time.Hour), now, tc.metricName)
require.NoError(t, err)

if !reflect.DeepEqual(tc.expect, labelNames1) {
t.Fatalf("%s: wrong label name - %s", tc.metricName, test.Diff(tc.expect, labelNames1))
}

// Pushing end of time-range into future should yield exact same resultset
labelNames2, err := store.LabelNamesForMetricName(ctx, now.Add(-time.Hour), now.Add(time.Hour*24*10), tc.metricName)
require.NoError(t, err)

if !reflect.DeepEqual(tc.expect, labelNames2) {
t.Fatalf("%s: wrong label name - %s", tc.metricName, test.Diff(tc.expect, labelNames2))
}

// Query with both begin & end of time-range in future should yield empty resultset
labelNames3, err := store.LabelNamesForMetricName(ctx, now.Add(time.Hour), now.Add(time.Hour*2), tc.metricName)
require.NoError(t, err)
if len(labelNames3) != 0 {
t.Fatalf("%s: future query should yield empty resultset ... actually got %v label names: %#v",
tc.metricName, len(labelNames3), labelNames3)
}
})
}
}
}

}

// TestChunkStore_getMetricNameChunks tests if chunks are fetched correctly when we have the metric name
func TestChunkStore_getMetricNameChunks(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), userID)
Expand Down
34 changes: 34 additions & 0 deletions pkg/chunk/chunk_store_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chunk

import (
"context"
"sort"
"sync"

"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -36,6 +37,39 @@ func keysFromChunks(chunks []Chunk) []string {
return keys
}

func labelNamesFromChunks(chunks []Chunk) []string {
keys := map[string]struct{}{}
var result []string
for _, c := range chunks {
for _, l := range c.Metric {
if l.Name != model.MetricNameLabel {
if _, ok := keys[string(l.Name)]; !ok {
keys[string(l.Name)] = struct{}{}
result = append(result, string(l.Name))
}
}
}
}
sort.Strings(result)
return result
}

func filterChunksByUniqueFingerprint(chunks []Chunk) ([]Chunk, []string) {
filtered := make([]Chunk, 0, len(chunks))
keys := make([]string, 0, len(chunks))
uniqueFp := map[model.Fingerprint]struct{}{}

for _, chunk := range chunks {
if _, ok := uniqueFp[chunk.Fingerprint]; ok {
continue
}
filtered = append(filtered, chunk)
keys = append(keys, chunk.ExternalKey())
uniqueFp[chunk.Fingerprint] = struct{}{}
}
return filtered, keys
}

func filterChunksByMatchers(chunks []Chunk, filters []*labels.Matcher) []Chunk {
filteredChunks := make([]Chunk, 0, len(chunks))
outer:
Expand Down
15 changes: 15 additions & 0 deletions pkg/chunk/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Store interface {
// using the corresponding Fetcher (fetchers[i].FetchChunks(ctx, chunks[i], ...)
GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error)
LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error)
LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error)
Stop()
}

Expand Down Expand Up @@ -106,6 +107,20 @@ func (c compositeStore) LabelValuesForMetricName(ctx context.Context, from, thro
return result, err
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c compositeStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
var result []string
err := c.forStores(from, through, func(from, through model.Time, store Store) error {
labelNames, err := store.LabelNamesForMetricName(ctx, from, through, metricName)
if err != nil {
return err
}
result = append(result, labelNames...)
return nil
})
return result, err
}

func (c compositeStore) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) {
chunkIDs := [][]Chunk{}
fetchers := []*Fetcher{}
Expand Down
4 changes: 4 additions & 0 deletions pkg/chunk/composite_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (m mockStore) GetChunkRefs(tx context.Context, from, through model.Time, ma
return nil, nil, nil
}

func (m mockStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
return nil, nil
}

func (m mockStore) Stop() {}

func TestCompositeStore(t *testing.T) {
Expand Down
55 changes: 55 additions & 0 deletions pkg/chunk/series_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,61 @@ func (c *seriesStore) GetChunkRefs(ctx context.Context, from, through model.Time
return [][]Chunk{chunks}, []*Fetcher{c.store.Fetcher}, nil
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *seriesStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.LabelNamesForMetricName")
defer log.Span.Finish()

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

shortcut, err := c.validateQueryTimeRange(ctx, &from, &through)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}
level.Debug(log).Log("metric", metricName)

// Fetch the series IDs from the index
seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, userID, metricName, nil)
if err != nil {
return nil, err
}
level.Debug(log).Log("series-ids", len(seriesIDs))

// Lookup the series in the index to get the chunks.
chunkIDs, err := c.lookupChunksBySeries(ctx, from, through, userID, seriesIDs)
if err != nil {
level.Error(log).Log("msg", "lookupChunksBySeries", "err", err)
return nil, err
}
level.Debug(log).Log("chunk-ids", len(chunkIDs))

chunks, err := c.convertChunkIDsToChunks(ctx, userID, chunkIDs)
if err != nil {
level.Error(log).Log("err", "convertChunkIDsToChunks", "err", err)
return nil, err
}

// Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint
filtered := filterChunksByTime(from, through, chunks)
filtered, keys := filterChunksByUniqueFingerprint(filtered)
level.Debug(log).Log("Chunks post filtering", len(chunks))

chunksPerQuery.Observe(float64(len(filtered)))

// Now fetch the actual chunk data from Memcache / S3
allChunks, err := c.FetchChunks(ctx, filtered, keys)
if err != nil {
level.Error(log).Log("msg", "FetchChunks", "err", err)
return nil, err
}
return labelNamesFromChunks(allChunks), nil
}

func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, userID, metricName string, matchers []*labels.Matcher) ([]string, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.lookupSeriesByMetricNameMatchers", "metricName", metricName, "matchers", len(matchers))
defer log.Span.Finish()
Expand Down