Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LabelNamesForMetricName for the series store #1346

Merged
merged 4 commits into from
Jul 8, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,49 @@ func (c *store) LabelValuesForMetricName(ctx context.Context, from, through mode
}
result = append(result, string(labelValue))
}
sort.Strings(result)
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
result = uniqueStrings(result)
return result, nil
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *store) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
log, ctx := spanlogger.New(ctx, "ChunkStore.LabelNamesForMetricName")
defer log.Span.Finish()
level.Debug(log).Log("from", from, "through", through, "metricName", metricName)

shortcut, err := c.validateQueryTimeRange(ctx, &from, &through)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}

chunks, err := c.lookupChunksByMetricName(ctx, from, through, nil, metricName)
if err != nil {
return nil, err
}
level.Debug(log).Log("msg", "Chunks in index", "chunks", len(chunks))

// Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint
filtered := filterChunksByTime(from, through, chunks)
filtered, keys := filterChunksByUniqueFingerprint(filtered)
level.Debug(log).Log("msg", "Chunks post filtering", "chunks", len(chunks))

// Now fetch the actual chunk data from Memcache / S3
allChunks, err := c.FetchChunks(ctx, filtered, keys)
if err != nil {
level.Error(log).Log("msg", "FetchChunks", "err", err)
return nil, err
}
var result []string
for _, c := range allChunks {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit seems likely to be inefficient - most chunks will have the same label names, but potentially thousands of unique fingerprints, so you will add thousands of copies of the same string then sort them then dedupe them.
Suggest using a map instead.

Copy link
Contributor Author

@cyriltovena cyriltovena Jul 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actuallyI'm only fetching a single chunk per unique fingerprints 257:

	filtered, keys := filterChunksByUniqueFingerprint(filtered)

but you're right unique fingerprint can have many label name duplicate. I'll improve this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Voila !

for _, l := range c.Metric {
if l.Name != model.MetricNameLabel {
result = append(result, string(l.Name))
}
}
}
sort.Strings(result)
result = uniqueStrings(result)
return result, nil
Expand Down
102 changes: 102 additions & 0 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,108 @@ func TestChunkStore_LabelValuesForMetricName(t *testing.T) {

}

func TestChunkStore_LabelNamesForMetricName(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), userID)
now := model.Now()

fooMetric1 := labels.Labels{
{Name: labels.MetricName, Value: "foo"},
{Name: "bar", Value: "baz"},
{Name: "flip", Value: "flop"},
{Name: "toms", Value: "code"},
}
fooMetric2 := labels.Labels{
{Name: labels.MetricName, Value: "foo"},
{Name: "bar", Value: "beep"},
{Name: "toms", Value: "code"},
}
fooMetric3 := labels.Labels{
{Name: labels.MetricName, Value: "foo"},
{Name: "bar", Value: "bop"},
{Name: "flip", Value: "flap"},
}

// barMetric1 is a subset of barMetric2 to test over-matching bug.
barMetric1 := labels.Labels{
{Name: labels.MetricName, Value: "bar"},
{Name: "bar", Value: "baz"},
}
barMetric2 := labels.Labels{
{Name: labels.MetricName, Value: "bar"},
{Name: "bar", Value: "baz"},
{Name: "toms", Value: "code"},
}

fooChunk1 := dummyChunkFor(now, fooMetric1)
fooChunk2 := dummyChunkFor(now, fooMetric2)
fooChunk3 := dummyChunkFor(now, fooMetric3)
fooChunk4 := dummyChunkFor(now.Add(-time.Hour), fooMetric1) // same series but different chunk

barChunk1 := dummyChunkFor(now, barMetric1)
barChunk2 := dummyChunkFor(now, barMetric2)

for _, tc := range []struct {
metricName string
expect []string
}{
{
`foo`,
[]string{"bar", "flip", "toms"},
},
{
`bar`,
[]string{"bar", "toms"},
},
} {
for _, schema := range schemas {
for _, storeCase := range stores {
t.Run(fmt.Sprintf("%s / %s / %s ", tc.metricName, schema.name, storeCase.name), func(t *testing.T) {
t.Log("========= Running labelNames with metricName", tc.metricName, "with schema", schema.name)
storeCfg := storeCase.configFn()
store := newTestChunkStoreConfig(t, schema.name, storeCfg)
defer store.Stop()

if err := store.Put(ctx, []Chunk{
fooChunk1,
fooChunk2,
fooChunk3,
fooChunk4,
barChunk1,
barChunk2,
}); err != nil {
t.Fatal(err)
}

// Query with ordinary time-range
labelNames1, err := store.LabelNamesForMetricName(ctx, now.Add(-time.Hour), now, tc.metricName)
require.NoError(t, err)

if !reflect.DeepEqual(tc.expect, labelNames1) {
t.Fatalf("%s: wrong label name - %s", tc.metricName, test.Diff(tc.expect, labelNames1))
}

// Pushing end of time-range into future should yield exact same resultset
labelNames2, err := store.LabelNamesForMetricName(ctx, now.Add(-time.Hour), now.Add(time.Hour*24*10), tc.metricName)
require.NoError(t, err)

if !reflect.DeepEqual(tc.expect, labelNames2) {
t.Fatalf("%s: wrong label name - %s", tc.metricName, test.Diff(tc.expect, labelNames2))
}

// Query with both begin & end of time-range in future should yield empty resultset
labelNames3, err := store.LabelNamesForMetricName(ctx, now.Add(time.Hour), now.Add(time.Hour*2), tc.metricName)
require.NoError(t, err)
if len(labelNames3) != 0 {
t.Fatalf("%s: future query should yield empty resultset ... actually got %v label names: %#v",
tc.metricName, len(labelNames3), labelNames3)
}
})
}
}
}

}

// TestChunkStore_getMetricNameChunks tests if chunks are fetched correctly when we have the metric name
func TestChunkStore_getMetricNameChunks(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), userID)
Expand Down
16 changes: 16 additions & 0 deletions pkg/chunk/chunk_store_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ func keysFromChunks(chunks []Chunk) []string {
return keys
}

func filterChunksByUniqueFingerprint(chunks []Chunk) ([]Chunk, []string) {
filtered := make([]Chunk, 0, len(chunks))
keys := make([]string, 0, len(chunks))
uniqueFp := map[model.Fingerprint]struct{}{}

for _, chunk := range chunks {
if _, ok := uniqueFp[chunk.Fingerprint]; ok {
continue
}
filtered = append(filtered, chunk)
keys = append(keys, chunk.ExternalKey())
uniqueFp[chunk.Fingerprint] = struct{}{}
}
return filtered, keys
}

func filterChunksByMatchers(chunks []Chunk, filters []*labels.Matcher) []Chunk {
filteredChunks := make([]Chunk, 0, len(chunks))
outer:
Expand Down
15 changes: 15 additions & 0 deletions pkg/chunk/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Store interface {
// using the corresponding Fetcher (fetchers[i].FetchChunks(ctx, chunks[i], ...)
GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error)
LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error)
LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error)
Stop()
}

Expand Down Expand Up @@ -106,6 +107,20 @@ func (c compositeStore) LabelValuesForMetricName(ctx context.Context, from, thro
return result, err
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c compositeStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
var result []string
err := c.forStores(from, through, func(from, through model.Time, store Store) error {
labelNames, err := store.LabelNamesForMetricName(ctx, from, through, metricName)
if err != nil {
return err
}
result = append(result, labelNames...)
return nil
})
return result, err
}

func (c compositeStore) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) {
chunkIDs := [][]Chunk{}
fetchers := []*Fetcher{}
Expand Down
4 changes: 4 additions & 0 deletions pkg/chunk/composite_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (m mockStore) GetChunkRefs(tx context.Context, from, through model.Time, ma
return nil, nil, nil
}

func (m mockStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
return nil, nil
}

func (m mockStore) Stop() {}

func TestCompositeStore(t *testing.T) {
Expand Down
67 changes: 67 additions & 0 deletions pkg/chunk/series_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"sort"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -191,6 +192,72 @@ func (c *seriesStore) GetChunkRefs(ctx context.Context, from, through model.Time
return [][]Chunk{chunks}, []*Fetcher{c.store.Fetcher}, nil
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *seriesStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.LabelNamesForMetricName")
defer log.Span.Finish()

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

shortcut, err := c.validateQueryTimeRange(ctx, &from, &through)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}
level.Debug(log).Log("metric", metricName)

// Fetch the series IDs from the index
seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, userID, metricName, nil)
if err != nil {
return nil, err
}
level.Debug(log).Log("series-ids", len(seriesIDs))

// Lookup the series in the index to get the chunks.
chunkIDs, err := c.lookupChunksBySeries(ctx, from, through, userID, seriesIDs)
if err != nil {
level.Error(log).Log("msg", "lookupChunksBySeries", "err", err)
return nil, err
}
level.Debug(log).Log("chunk-ids", len(chunkIDs))

chunks, err := c.convertChunkIDsToChunks(ctx, userID, chunkIDs)
if err != nil {
level.Error(log).Log("err", "convertChunkIDsToChunks", "err", err)
return nil, err
}

// Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint
filtered := filterChunksByTime(from, through, chunks)
filtered, keys := filterChunksByUniqueFingerprint(filtered)
level.Debug(log).Log("Chunks post filtering", len(chunks))

chunksPerQuery.Observe(float64(len(filtered)))

// Now fetch the actual chunk data from Memcache / S3
allChunks, err := c.FetchChunks(ctx, filtered, keys)
if err != nil {
level.Error(log).Log("msg", "FetchChunks", "err", err)
return nil, err
}

var result []string
for _, c := range allChunks {
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
for _, l := range c.Metric {
if l.Name != model.MetricNameLabel {
result = append(result, string(l.Name))
}
}
}
sort.Strings(result)
result = uniqueStrings(result)
return result, nil
}

func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, userID, metricName string, matchers []*labels.Matcher) ([]string, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.lookupSeriesByMetricNameMatchers", "metricName", metricName, "matchers", len(matchers))
defer log.Span.Finish()
Expand Down