Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds the ability the fetch label values from the store #1337

Merged
merged 2 commits into from
May 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 64 additions & 7 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,35 +188,77 @@ func (c *store) Get(ctx context.Context, from, through model.Time, allMatchers .
return c.getMetricNameChunks(ctx, from, through, matchers, metricName)
}

func (c *store) validateQuery(ctx context.Context, from model.Time, through *model.Time, matchers []*labels.Matcher) (string, []*labels.Matcher, bool, error) {
log, ctx := spanlogger.New(ctx, "store.validateQuery")
// LabelValuesForMetricName retrieves all label values for a single label name and metric name.
func (c *store) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName, labelName string) ([]string, error) {
log, ctx := spanlogger.New(ctx, "ChunkStore.LabelValues")
defer log.Span.Finish()
level.Debug(log).Log("from", from, "through", through, "metricName", metricName, "labelName", labelName)

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

shortcut, err := c.validateQueryTimeRange(ctx, from, &through)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}

queries, err := c.schema.GetReadQueriesForMetricLabel(from, through, userID, model.LabelValue(metricName), model.LabelName(labelName))
if err != nil {
return nil, err
}

entries, err := c.lookupEntriesByQueries(ctx, queries)
if err != nil {
return nil, err
}

var result []string
for _, entry := range entries {
_, labelValue, _, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value)
if err != nil {
return nil, err
}
result = append(result, string(labelValue))
}

sort.Strings(result)
result = uniqueStrings(result)
return result, nil
}

func (c *store) validateQueryTimeRange(ctx context.Context, from model.Time, through *model.Time) (bool, error) {
log, ctx := spanlogger.New(ctx, "store.validateQueryTimeRange")
defer log.Span.Finish()

if *through < from {
return "", nil, false, httpgrpc.Errorf(http.StatusBadRequest, "invalid query, through < from (%s < %s)", through, from)
return false, httpgrpc.Errorf(http.StatusBadRequest, "invalid query, through < from (%s < %s)", through, from)
}

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return "", nil, false, err
return false, err
}

maxQueryLength := c.limits.MaxQueryLength(userID)
if maxQueryLength > 0 && (*through).Sub(from) > maxQueryLength {
return "", nil, false, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, (*through).Sub(from), maxQueryLength)
return false, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, (*through).Sub(from), maxQueryLength)
}

now := model.Now()

if from.After(now) {
// time-span start is in future ... regard as legal
level.Error(log).Log("msg", "whole timerange in future, yield empty resultset", "through", through, "from", from, "now", now)
return "", nil, true, nil
return true, nil
}

if from.After(now.Add(-c.cfg.MinChunkAge)) {
// no data relevant to this query will have arrived at the store yet
return "", nil, true, nil
return true, nil
}

if through.After(now.Add(5 * time.Minute)) {
Expand All @@ -225,6 +267,21 @@ func (c *store) validateQuery(ctx context.Context, from model.Time, through *mod
*through = now // Avoid processing future part - otherwise some schemas could fail with eg non-existent table gripes
}

return false, nil
}

func (c *store) validateQuery(ctx context.Context, from model.Time, through *model.Time, matchers []*labels.Matcher) (string, []*labels.Matcher, bool, error) {
log, ctx := spanlogger.New(ctx, "store.validateQuery")
defer log.Span.Finish()

shortcut, err := c.validateQueryTimeRange(ctx, from, through)
if err != nil {
return "", nil, false, err
}
if shortcut {
return "", nil, true, nil
}

// Check there is a metric name matcher of type equal,
metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(matchers)
if !ok || metricNameMatcher.Type != labels.MatchEqual {
Expand Down
112 changes: 112 additions & 0 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,118 @@ func TestChunkStore_Get(t *testing.T) {
}
}

func TestChunkStore_LabelValuesForMetricName(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), userID)
now := model.Now()

fooMetric1 := model.Metric{
model.MetricNameLabel: "foo",
"bar": "baz",
"toms": "code",
"flip": "flop",
}
fooMetric2 := model.Metric{
model.MetricNameLabel: "foo",
"bar": "beep",
"toms": "code",
}
fooMetric3 := model.Metric{
model.MetricNameLabel: "foo",
"flip": "flap",
"bar": "bop",
}

// barMetric1 is a subset of barMetric2 to test over-matching bug.
barMetric1 := model.Metric{
model.MetricNameLabel: "bar",
"bar": "baz",
}
barMetric2 := model.Metric{
model.MetricNameLabel: "bar",
"bar": "baz",
"toms": "code",
}

fooChunk1 := dummyChunkFor(now, fooMetric1)
fooChunk2 := dummyChunkFor(now, fooMetric2)
fooChunk3 := dummyChunkFor(now, fooMetric3)

barChunk1 := dummyChunkFor(now, barMetric1)
barChunk2 := dummyChunkFor(now, barMetric2)

for _, tc := range []struct {
metricName, labelName string
expect []string
}{
{
`foo`, `bar`,
[]string{"baz", "beep", "bop"},
},
{
`bar`, `toms`,
[]string{"code"},
},
{
`bar`, `bar`,
[]string{"baz"},
},
{
`foo`, `foo`,
nil,
},
{
`foo`, `flip`,
[]string{"flap", "flop"},
},
} {
for _, schema := range schemas {
for _, storeCase := range stores {
t.Run(fmt.Sprintf("%s / %s / %s / %s", tc.metricName, tc.labelName, schema.name, storeCase.name), func(t *testing.T) {
t.Log("========= Running labelValues with metricName", tc.metricName, "with labelName", tc.labelName, "with schema", schema.name)
storeCfg := storeCase.configFn()
store := newTestChunkStoreConfig(t, schema.name, storeCfg)
defer store.Stop()

if err := store.Put(ctx, []Chunk{
fooChunk1,
fooChunk2,
fooChunk3,
barChunk1,
barChunk2,
}); err != nil {
t.Fatal(err)
}

// Query with ordinary time-range
labelValues1, err := store.LabelValuesForMetricName(ctx, now.Add(-time.Hour), now, tc.metricName, tc.labelName)
require.NoError(t, err)

if !reflect.DeepEqual(tc.expect, labelValues1) {
t.Fatalf("%s/%s: wrong label values - %s", tc.metricName, tc.labelName, test.Diff(tc.expect, labelValues1))
}

// Pushing end of time-range into future should yield exact same resultset
labelValues2, err := store.LabelValuesForMetricName(ctx, now.Add(-time.Hour), now.Add(time.Hour*24*10), tc.metricName, tc.labelName)
require.NoError(t, err)

if !reflect.DeepEqual(tc.expect, labelValues2) {
t.Fatalf("%s/%s: wrong label values - %s", tc.metricName, tc.labelName, test.Diff(tc.expect, labelValues2))
}

// Query with both begin & end of time-range in future should yield empty resultset
labelValues3, err := store.LabelValuesForMetricName(ctx, now.Add(time.Hour), now.Add(time.Hour*2), tc.metricName, tc.labelName)
require.NoError(t, err)
if len(labelValues3) != 0 {
t.Fatalf("%s/%s: future query should yield empty resultset ... actually got %v label values: %#v",
tc.metricName, tc.labelName, len(labelValues3), labelValues3)
}
})
}
}
}

}

// TestChunkStore_getMetricNameChunks tests if chunks are fetched correctly when we have the metric name
func TestChunkStore_getMetricNameChunks(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), userID)
Expand Down
15 changes: 15 additions & 0 deletions pkg/chunk/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Store interface {
Put(ctx context.Context, chunks []Chunk) error
PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error
Get(tx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error)
LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error)
Stop()
}

Expand Down Expand Up @@ -88,6 +89,20 @@ func (c compositeStore) Get(ctx context.Context, from, through model.Time, match
return results, err
}

// LabelValuesForMetricName retrieves all label values for a single label name and metric name.
func (c compositeStore) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error) {
var result []string
err := c.forStores(from, through, func(from, through model.Time, store Store) error {
labelValues, err := store.LabelValuesForMetricName(ctx, from, through, metricName, labelName)
if err != nil {
return err
}
result = append(result, labelValues...)
return nil
})
return result, err
}

func (c compositeStore) Stop() {
for _, store := range c.stores {
store.Stop()
Expand Down
3 changes: 3 additions & 0 deletions pkg/chunk/composite_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ func (m mockStore) PutOne(ctx context.Context, from, through model.Time, chunk C
func (m mockStore) Get(tx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error) {
return nil, nil
}
func (m mockStore) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error) {
return nil, nil
}

func (m mockStore) Stop() {}

Expand Down