Skip to content

Commit

Permalink
Distributing sum queries (grafana#1878)
Browse files Browse the repository at this point in the history
* querier.sum-shards

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* addresses pr comments

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* instruments frontend sharding, splitby

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* LabelsSeriesID unexported again

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* removes unnecessary codec interface in astmapping

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* simplifies VectorSquasher as we never use matrices
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* combines queryrange series & value files
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* removes noops struct embedding strategy in schema, provides noop impls on all schemas instead
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* NewSubtreeFolder no longer can return an error as it inlines the jsonCodec
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* account for QueryIngestersWithin renaming
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* fixes rebase import collision

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* fixes rebase conflicts
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* -marks absent as non parallelizable

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* upstream promql compatibility changes

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* addresses pr comments
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* import collisions

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* linting - fixes goimports -local requirement

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* fixes merge conflicts

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* addresses pr comments

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* stylistic changes

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* s/downstream/sharded/

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* s/sum_shards/parallelise_shardable_queries/

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* query-audit docs

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* notes sharded parallelizations are only supported by chunk store

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* doc suggestions

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
owen-d authored Feb 21, 2020
1 parent 31d12a9 commit 1247427
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 10 deletions.
3 changes: 3 additions & 0 deletions chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,9 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, userID string, fro
}

func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) {
log, ctx := spanlogger.New(ctx, "store.lookupEntriesByQueries")
defer log.Span.Finish()

var lock sync.Mutex
var entries []IndexEntry
err := c.index.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool {
Expand Down
2 changes: 2 additions & 0 deletions chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg Sto
tbmConfig TableManagerConfig
schemaCfg = DefaultSchemaConfig("", schemaName, 0)
)
err := schemaCfg.Validate()
require.NoError(t, err)
flagext.DefaultValues(&tbmConfig)
storage := NewMockStorage()
tableManager, err := NewTableManager(tbmConfig, schemaCfg, maxChunkAge, storage, nil)
Expand Down
9 changes: 6 additions & 3 deletions chunk_store_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,13 @@ func (c *Fetcher) worker() {
// FetchChunks fetches a set of chunks from cache and store. Note that the keys passed in must be
// lexicographically sorted, while the returned chunks are not in the same order as the passed in chunks.
func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string) ([]Chunk, error) {
log, ctx := spanlogger.New(ctx, "ChunkStore.fetchChunks")
log, ctx := spanlogger.New(ctx, "ChunkStore.FetchChunks")
defer log.Span.Finish()

// Now fetch the actual chunk data from Memcache / S3
cacheHits, cacheBufs, _ := c.cache.Fetch(ctx, keys)

fromCache, missing, err := c.processCacheResponse(chunks, cacheHits, cacheBufs)
fromCache, missing, err := c.processCacheResponse(ctx, chunks, cacheHits, cacheBufs)
if err != nil {
level.Warn(log).Log("msg", "error fetching from cache", "err", err)
}
Expand Down Expand Up @@ -199,12 +199,14 @@ func (c *Fetcher) writeBackCache(ctx context.Context, chunks []Chunk) error {

// ProcessCacheResponse decodes the chunks coming back from the cache, separating
// hits and misses.
func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) ([]Chunk, []Chunk, error) {
func (c *Fetcher) processCacheResponse(ctx context.Context, chunks []Chunk, keys []string, bufs [][]byte) ([]Chunk, []Chunk, error) {
var (
requests = make([]decodeRequest, 0, len(keys))
responses = make(chan decodeResponse)
missing []Chunk
)
log, _ := spanlogger.New(ctx, "Fetcher.processCacheResponse")
defer log.Span.Finish()

i, j := 0, 0
for i < len(chunks) && j < len(keys) {
Expand All @@ -229,6 +231,7 @@ func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]b
for ; i < len(chunks); i++ {
missing = append(missing, chunks[i])
}
level.Debug(log).Log("chunks", len(chunks), "decodeRequests", len(requests), "missing", len(missing))

go func() {
for _, request := range requests {
Expand Down
61 changes: 59 additions & 2 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import (
"encoding/hex"
"errors"
"fmt"
"strconv"
"strings"

"github.com/go-kit/kit/log/level"
jsoniter "github.com/json-iterator/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/cortexproject/cortex/pkg/util"
)

const (
Expand Down Expand Up @@ -48,6 +53,7 @@ type Schema interface {
GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error)
GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error)
GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error)
FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery

// If the query resulted in series IDs, use this method to find chunks.
GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error)
Expand Down Expand Up @@ -218,6 +224,10 @@ func (s schema) GetLabelNamesForSeries(from, through model.Time, userID string,
return result, nil
}

func (s schema) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery {
return s.entries.FilterReadQueries(queries, shard)
}

type entries interface {
GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)
GetLabelWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)
Expand All @@ -228,6 +238,7 @@ type entries interface {
GetReadMetricLabelValueQueries(bucket Bucket, metricName string, labelName string, labelValue string) ([]IndexQuery, error)
GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error)
GetLabelNamesForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error)
FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery
}

// original entries:
Expand Down Expand Up @@ -303,6 +314,10 @@ func (originalEntries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery,
return nil, ErrNotSupported
}

func (originalEntries) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery {
return queries
}

// v3Schema went to base64 encoded label values & a version ID
// - range key: <label name>\0<base64(label value)>\0<chunk name>\0<version 1>

Expand Down Expand Up @@ -422,6 +437,10 @@ func (labelNameInHashKeyEntries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]I
return nil, ErrNotSupported
}

func (labelNameInHashKeyEntries) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery {
return queries
}

// v5 schema is an extension of v4, with the chunk end time in the
// range key to improve query latency. However, it did it wrong
// so the chunk end times are ignored.
Expand Down Expand Up @@ -496,6 +515,10 @@ func (v5Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error
return nil, ErrNotSupported
}

func (v5Entries) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery {
return queries
}

// v6Entries fixes issues with v5 time encoding being wrong (see #337), and
// moves label value out of range key (see #199).
type v6Entries struct{}
Expand Down Expand Up @@ -576,10 +599,13 @@ func (v6Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error
return nil, ErrNotSupported
}

// v9Entries adds a layer of indirection between labels -> series -> chunks.
type v9Entries struct {
func (v6Entries) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery {
return queries
}

// v9Entries adds a layer of indirection between labels -> series -> chunks.
type v9Entries struct{}

func (v9Entries) GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
return nil, ErrNotSupported
}
Expand Down Expand Up @@ -675,6 +701,10 @@ func (v9Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error
return nil, ErrNotSupported
}

func (v9Entries) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery {
return queries
}

// v10Entries builds on v9 by sharding index rows to reduce their size.
type v10Entries struct {
rowShards uint32
Expand Down Expand Up @@ -784,6 +814,33 @@ func (v10Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, erro
return nil, ErrNotSupported
}

// FilterReadQueries will return only queries that match a certain shard
func (v10Entries) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) (matches []IndexQuery) {
if shard == nil {
return queries
}

for _, query := range queries {
s := strings.Split(query.HashValue, ":")[0]
n, err := strconv.Atoi(s)
if err != nil {
level.Error(util.Logger).Log(
"msg",
"Unable to determine shard from IndexQuery",
"HashValue",
query.HashValue,
"schema",
"v10",
)
}

if err == nil && n == shard.Shard {
matches = append(matches, query)
}
}
return matches
}

// v11Entries builds on v10 but adds index entries for each series to store respective labels.
type v11Entries struct {
v10Entries
Expand Down
12 changes: 10 additions & 2 deletions schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,18 @@ func (cfg *SchemaConfig) Validate() error {
return err
}
}

return nil
}

func defaultRowShards(schema string) uint32 {
switch schema {
case "v1", "v2", "v3", "v4", "v5", "v6", "v9":
return 0
default:
return 16
}
}

// ForEachAfter will call f() on every entry after t, splitting
// entries if necessary so there is an entry starting at t
func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig)) {
Expand All @@ -219,7 +227,7 @@ func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig)

// CreateSchema returns the schema defined by the PeriodConfig
func (cfg PeriodConfig) CreateSchema() Schema {
rowShards := uint32(16)
rowShards := defaultRowShards(cfg.Schema)
if cfg.RowShards > 0 {
rowShards = cfg.RowShards
}
Expand Down
67 changes: 67 additions & 0 deletions schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/test"

"github.com/cortexproject/cortex/pkg/querier/astmapper"
)

type ByHashRangeKey []IndexEntry
Expand Down Expand Up @@ -387,5 +389,70 @@ func BenchmarkEncodeLabelsString(b *testing.B) {
}
b.Log("data size", len(data))
b.Log("decode", decoded)
}

// Ensure all currently defined entries can inhabit the entries interface
func TestEnsureEntriesInhabitInterface(t *testing.T) {
var _ = []entries{
originalEntries{},
base64Entries{},
labelNameInHashKeyEntries{},
v5Entries{},
v6Entries{},
v9Entries{},
v10Entries{},
v11Entries{},
}
}

func TestV10IndexQueries(t *testing.T) {
fromShards := func(n int) (res []IndexQuery) {
for i := 0; i < n; i++ {
res = append(res, IndexQuery{
TableName: "tbl",
HashValue: fmt.Sprintf("%02d:%s:%s:%s", i, "hash", "metric", "label"),
RangeValueStart: []byte(string(i)),
ValueEqual: []byte(string(i)),
})
}
return res
}

var testExprs = []struct {
name string
queries []IndexQuery
shard *astmapper.ShardAnnotation
expected []IndexQuery
}{
{
name: "passthrough when no shard specified",
queries: fromShards(2),
shard: nil,
expected: fromShards(2),
},
{
name: "out of bounds shard returns 0 matches",
queries: fromShards(2),
shard: &astmapper.ShardAnnotation{
Shard: 3,
},
expected: nil,
},
{
name: "return correct shard",
queries: fromShards(3),
shard: &astmapper.ShardAnnotation{
Shard: 1,
},
expected: []IndexQuery{fromShards(2)[1]},
},
}

for _, c := range testExprs {
t.Run(c.name, func(t *testing.T) {
s := v10Entries{}
filtered := s.FilterReadQueries(c.queries, c.shard)
require.Equal(t, c.expected, filtered)
})
}
}
41 changes: 38 additions & 3 deletions series_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -131,6 +132,15 @@ func (c *seriesStore) Get(ctx context.Context, userID string, from, through mode
return nil, err
}

// inject artificial __cortex_shard__ labels if present in the query. GetChunkRefs guarantees any chunk refs match the shard.
shard, _, err := astmapper.ShardFromMatchers(allMatchers)
if err != nil {
return nil, err
}
if shard != nil {
injectShardLabels(allChunks, *shard)
}

// Filter out chunks based on the empty matchers in the query.
filteredChunks := filterChunksByMatchers(allChunks, allMatchers)
return filteredChunks, nil
Expand Down Expand Up @@ -252,10 +262,21 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from
log, ctx := spanlogger.New(ctx, "SeriesStore.lookupSeriesByMetricNameMatchers", "metricName", metricName, "matchers", len(matchers))
defer log.Span.Finish()

// Check if one of the labels is a shard annotation, pass that information to lookupSeriesByMetricNameMatcher,
// and remove the label.
shard, shardLabelIndex, err := astmapper.ShardFromMatchers(matchers)
if err != nil {
return nil, err
}

if shard != nil {
matchers = append(matchers[:shardLabelIndex], matchers[shardLabelIndex+1:]...)
}

// Just get series for metric if there are no matchers
if len(matchers) == 0 {
indexLookupsPerQuery.Observe(1)
series, err := c.lookupSeriesByMetricNameMatcher(ctx, from, through, userID, metricName, nil)
series, err := c.lookupSeriesByMetricNameMatcher(ctx, from, through, userID, metricName, nil, shard)
if err != nil {
preIntersectionPerQuery.Observe(float64(len(series)))
postIntersectionPerQuery.Observe(float64(len(series)))
Expand All @@ -269,7 +290,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from
indexLookupsPerQuery.Observe(float64(len(matchers)))
for _, matcher := range matchers {
go func(matcher *labels.Matcher) {
ids, err := c.lookupSeriesByMetricNameMatcher(ctx, from, through, userID, metricName, matcher)
ids, err := c.lookupSeriesByMetricNameMatcher(ctx, from, through, userID, metricName, matcher, shard)
if err != nil {
incomingErrors <- err
return
Expand Down Expand Up @@ -320,7 +341,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from
return ids, nil
}

func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, through model.Time, userID, metricName string, matcher *labels.Matcher) ([]string, error) {
func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, through model.Time, userID, metricName string, matcher *labels.Matcher, shard *astmapper.ShardAnnotation) ([]string, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.lookupSeriesByMetricNameMatcher", "metricName", metricName, "matcher", matcher)
defer log.Span.Finish()

Expand All @@ -341,6 +362,10 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from,
}
level.Debug(log).Log("queries", len(queries))

queries = c.schema.FilterReadQueries(queries, shard)

level.Debug(log).Log("filteredQueries", len(queries))

entries, err := c.lookupEntriesByQueries(ctx, queries)
if e, ok := err.(CardinalityExceededError); ok {
e.MetricName = metricName
Expand Down Expand Up @@ -509,3 +534,13 @@ func (c *seriesStore) calculateIndexEntries(ctx context.Context, from, through m

return result, missing, nil
}

func injectShardLabels(chunks []Chunk, shard astmapper.ShardAnnotation) {
for i, chunk := range chunks {
b := labels.NewBuilder(chunk.Metric)
l := shard.Label()
b.Set(l.Name, l.Value)
chunk.Metric = b.Labels()
chunks[i] = chunk
}
}

0 comments on commit 1247427

Please sign in to comment.