Skip to content

Commit

Permalink
adds LabelNamesForMetricName for the series store
Browse files Browse the repository at this point in the history
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena committed Jun 7, 2019
1 parent a491c4a commit 813c1b8
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 0 deletions.
42 changes: 42 additions & 0 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,49 @@ func (c *store) LabelValuesForMetricName(ctx context.Context, from, through mode
}
result = append(result, string(labelValue))
}
sort.Strings(result)
result = uniqueStrings(result)
return result, nil
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *store) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
log, ctx := spanlogger.New(ctx, "ChunkStore.LabelNamesForMetricName")
defer log.Span.Finish()
level.Debug(log).Log("from", from, "through", through, "metricName", metricName)

shortcut, err := c.validateQueryTimeRange(ctx, &from, &through)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}

chunks, err := c.lookupChunksByMetricName(ctx, from, through, nil, metricName)
if err != nil {
return nil, err
}
level.Debug(log).Log("Chunks in index", len(chunks))

// Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint
filtered := filterChunksByTime(from, through, chunks)
filtered, keys := filterChunksByUniqueFingerPrint(filtered)
level.Debug(log).Log("Chunks post filtering", len(chunks))

// Now fetch the actual chunk data from Memcache / S3
allChunks, err := c.FetchChunks(ctx, filtered, keys)
if err != nil {
level.Error(log).Log("msg", "FetchChunks", "err", err)
return nil, err
}
var result []string
for _, c := range allChunks {
for _, l := range c.Metric {
if l.Name != model.MetricNameLabel {
result = append(result, string(l.Name))
}
}
}
sort.Strings(result)
result = uniqueStrings(result)
return result, nil
Expand Down
102 changes: 102 additions & 0 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,108 @@ func TestChunkStore_LabelValuesForMetricName(t *testing.T) {

}

func TestChunkStore_LabelNamesForMetricName(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), userID)
now := model.Now()

fooMetric1 := labels.Labels{
{Name: labels.MetricName, Value: "foo"},
{Name: "bar", Value: "baz"},
{Name: "flip", Value: "flop"},
{Name: "toms", Value: "code"},
}
fooMetric2 := labels.Labels{
{Name: labels.MetricName, Value: "foo"},
{Name: "bar", Value: "beep"},
{Name: "toms", Value: "code"},
}
fooMetric3 := labels.Labels{
{Name: labels.MetricName, Value: "foo"},
{Name: "bar", Value: "bop"},
{Name: "flip", Value: "flap"},
}

// barMetric1 is a subset of barMetric2 to test over-matching bug.
barMetric1 := labels.Labels{
{Name: labels.MetricName, Value: "bar"},
{Name: "bar", Value: "baz"},
}
barMetric2 := labels.Labels{
{Name: labels.MetricName, Value: "bar"},
{Name: "bar", Value: "baz"},
{Name: "toms", Value: "code"},
}

fooChunk1 := dummyChunkFor(now, fooMetric1)
fooChunk2 := dummyChunkFor(now, fooMetric2)
fooChunk3 := dummyChunkFor(now, fooMetric3)
fooChunk4 := dummyChunkFor(now.Add(-time.Hour), fooMetric1) // same series but different chunk

barChunk1 := dummyChunkFor(now, barMetric1)
barChunk2 := dummyChunkFor(now, barMetric2)

for _, tc := range []struct {
metricName string
expect []string
}{
{
`foo`,
[]string{"bar", "flip", "toms"},
},
{
`bar`,
[]string{"bar", "toms"},
},
} {
for _, schema := range schemas {
for _, storeCase := range stores {
t.Run(fmt.Sprintf("%s / %s / %s ", tc.metricName, schema.name, storeCase.name), func(t *testing.T) {
t.Log("========= Running labelNames with metricName", tc.metricName, "with schema", schema.name)
storeCfg := storeCase.configFn()
store := newTestChunkStoreConfig(t, schema.name, storeCfg)
defer store.Stop()

if err := store.Put(ctx, []Chunk{
fooChunk1,
fooChunk2,
fooChunk3,
fooChunk4,
barChunk1,
barChunk2,
}); err != nil {
t.Fatal(err)
}

// Query with ordinary time-range
labelNames1, err := store.LabelNamesForMetricName(ctx, now.Add(-time.Hour), now, tc.metricName)
require.NoError(t, err)

if !reflect.DeepEqual(tc.expect, labelNames1) {
t.Fatalf("%s: wrong label name - %s", tc.metricName, test.Diff(tc.expect, labelNames1))
}

// Pushing end of time-range into future should yield exact same resultset
labelNames2, err := store.LabelNamesForMetricName(ctx, now.Add(-time.Hour), now.Add(time.Hour*24*10), tc.metricName)
require.NoError(t, err)

if !reflect.DeepEqual(tc.expect, labelNames2) {
t.Fatalf("%s: wrong label name - %s", tc.metricName, test.Diff(tc.expect, labelNames2))
}

// Query with both begin & end of time-range in future should yield empty resultset
labelNames3, err := store.LabelNamesForMetricName(ctx, now.Add(time.Hour), now.Add(time.Hour*2), tc.metricName)
require.NoError(t, err)
if len(labelNames3) != 0 {
t.Fatalf("%s: future query should yield empty resultset ... actually got %v label names: %#v",
tc.metricName, len(labelNames3), labelNames3)
}
})
}
}
}

}

// TestChunkStore_getMetricNameChunks tests if chunks are fetched correctly when we have the metric name
func TestChunkStore_getMetricNameChunks(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), userID)
Expand Down
16 changes: 16 additions & 0 deletions pkg/chunk/chunk_store_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ func keysFromChunks(chunks []Chunk) []string {
return keys
}

func filterChunksByUniqueFingerPrint(chunks []Chunk) ([]Chunk, []string) {
filtered := make([]Chunk, 0, len(chunks))
keys := make([]string, 0, len(chunks))
uniqueFp := map[model.Fingerprint]struct{}{}

for _, chunk := range chunks {
if _, ok := uniqueFp[chunk.Fingerprint]; ok {
continue
}
filtered = append(filtered, chunk)
keys = append(keys, chunk.ExternalKey())
uniqueFp[chunk.Fingerprint] = struct{}{}
}
return filtered, keys
}

func filterChunksByMatchers(chunks []Chunk, filters []*labels.Matcher) []Chunk {
filteredChunks := make([]Chunk, 0, len(chunks))
outer:
Expand Down
15 changes: 15 additions & 0 deletions pkg/chunk/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Store interface {
// using the corresponding Fetcher (fetchers[i].FetchChunks(ctx, chunks[i], ...)
GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error)
LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error)
LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error)
Stop()
}

Expand Down Expand Up @@ -106,6 +107,20 @@ func (c compositeStore) LabelValuesForMetricName(ctx context.Context, from, thro
return result, err
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c compositeStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
var result []string
err := c.forStores(from, through, func(from, through model.Time, store Store) error {
labelNames, err := store.LabelNamesForMetricName(ctx, from, through, metricName)
if err != nil {
return err
}
result = append(result, labelNames...)
return nil
})
return result, err
}

func (c compositeStore) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) {
chunkIDs := [][]Chunk{}
fetchers := []*Fetcher{}
Expand Down
4 changes: 4 additions & 0 deletions pkg/chunk/composite_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (m mockStore) GetChunkRefs(tx context.Context, from, through model.Time, ma
return nil, nil, nil
}

func (m mockStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
return nil, nil
}

func (m mockStore) Stop() {}

func TestCompositeStore(t *testing.T) {
Expand Down
67 changes: 67 additions & 0 deletions pkg/chunk/series_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"sort"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -191,6 +192,72 @@ func (c *seriesStore) GetChunkRefs(ctx context.Context, from, through model.Time
return [][]Chunk{chunks}, []*Fetcher{c.store.Fetcher}, nil
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *seriesStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.LabelNamesForMetricName")
defer log.Span.Finish()

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

shortcut, err := c.validateQueryTimeRange(ctx, &from, &through)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}
level.Debug(log).Log("metric", metricName)

// Fetch the series IDs from the index
seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, userID, metricName, nil)
if err != nil {
return nil, err
}
level.Debug(log).Log("series-ids", len(seriesIDs))

// Lookup the series in the index to get the chunks.
chunkIDs, err := c.lookupChunksBySeries(ctx, from, through, userID, seriesIDs)
if err != nil {
level.Error(log).Log("msg", "lookupChunksBySeries", "err", err)
return nil, err
}
level.Debug(log).Log("chunk-ids", len(chunkIDs))

chunks, err := c.convertChunkIDsToChunks(ctx, userID, chunkIDs)
if err != nil {
level.Error(log).Log("err", "convertChunkIDsToChunks", "err", err)
return nil, err
}

// Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint
filtered := filterChunksByTime(from, through, chunks)
filtered, keys := filterChunksByUniqueFingerPrint(filtered)
level.Debug(log).Log("Chunks post filtering", len(chunks))

chunksPerQuery.Observe(float64(len(filtered)))

// Now fetch the actual chunk data from Memcache / S3
allChunks, err := c.FetchChunks(ctx, filtered, keys)
if err != nil {
level.Error(log).Log("msg", "FetchChunks", "err", err)
return nil, err
}

var result []string
for _, c := range allChunks {
for _, l := range c.Metric {
if l.Name != model.MetricNameLabel {
result = append(result, string(l.Name))
}
}
}
sort.Strings(result)
result = uniqueStrings(result)
return result, nil
}

func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, userID, metricName string, matchers []*labels.Matcher) ([]string, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.lookupSeriesByMetricNameMatchers", "metricName", metricName, "matchers", len(matchers))
defer log.Span.Finish()
Expand Down

0 comments on commit 813c1b8

Please sign in to comment.