Skip to content

Commit

Permalink
Merge pull request grafana#981 from grafana/batch-index-lookups
Browse files Browse the repository at this point in the history
Batch index lookups (take #2)
  • Loading branch information
tomwilkie authored Sep 14, 2018
2 parents 675e1cc + bbc2cc2 commit 311a376
Show file tree
Hide file tree
Showing 14 changed files with 674 additions and 944 deletions.
42 changes: 32 additions & 10 deletions aws/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/user"
"github.com/weaveworks/cortex/pkg/chunk"
chunk_util "github.com/weaveworks/cortex/pkg/chunk/util"
"github.com/weaveworks/cortex/pkg/util"
)

Expand Down Expand Up @@ -301,7 +302,11 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e
return backoff.Err()
}

func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error {
func (a storageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error {
return chunk_util.DoParallelQueries(ctx, a.query, queries, callback)
}

func (a storageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error {
sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue})
defer sp.Finish()

Expand Down Expand Up @@ -371,7 +376,7 @@ func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, c
return nil
}

func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (dynamoDBReadResponse, error) {
func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (*dynamoDBReadResponse, error) {
backoff := util.NewBackoff(ctx, a.cfg.backoffConfig)
defer func() {
dynamoQueryRetryCount.WithLabelValues("queryPage").Observe(float64(backoff.NumRetries()))
Expand Down Expand Up @@ -401,7 +406,9 @@ func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput
}

queryOutput := page.Data().(*dynamodb.QueryOutput)
return dynamoDBReadResponse(queryOutput.Items), nil
return &dynamoDBReadResponse{
items: queryOutput.Items,
}, nil
}
return nil, fmt.Errorf("QueryPage error: %s for table %v, last error %v", backoff.Err(), *input.TableName, err)
}
Expand Down Expand Up @@ -785,18 +792,33 @@ func (a storageClient) putS3Chunk(ctx context.Context, key string, buf []byte) e
}

// Slice of values returned; map key is attribute name
type dynamoDBReadResponse []map[string]*dynamodb.AttributeValue
type dynamoDBReadResponse struct {
items []map[string]*dynamodb.AttributeValue
}

func (b *dynamoDBReadResponse) Iterator() chunk.ReadBatchIterator {
return &dynamoDBReadResponseIterator{
i: -1,
dynamoDBReadResponse: b,
}
}

type dynamoDBReadResponseIterator struct {
i int
*dynamoDBReadResponse
}

func (b dynamoDBReadResponse) Len() int {
return len(b)
func (b *dynamoDBReadResponseIterator) Next() bool {
b.i++
return b.i < len(b.items)
}

func (b dynamoDBReadResponse) RangeValue(i int) []byte {
return b[i][rangeKey].B
func (b *dynamoDBReadResponseIterator) RangeValue() []byte {
return b.items[b.i][rangeKey].B
}

func (b dynamoDBReadResponse) Value(i int) []byte {
chunkValue, ok := b[i][valueKey]
func (b *dynamoDBReadResponseIterator) Value() []byte {
chunkValue, ok := b.items[b.i][valueKey]
if !ok {
return nil
}
Expand Down
12 changes: 1 addition & 11 deletions cache/fifo_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"sync"
"time"

ot "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
Expand Down Expand Up @@ -132,9 +130,6 @@ func (c *FifoCache) Stop() error {

// Put stores the value against the key.
func (c *FifoCache) Put(ctx context.Context, key string, value interface{}) {
span, ctx := ot.StartSpanFromContext(ctx, c.name+"-cache-put")
defer span.Finish()

c.entriesAdded.Inc()
if c.size == 0 {
return
Expand Down Expand Up @@ -202,9 +197,6 @@ func (c *FifoCache) Put(ctx context.Context, key string, value interface{}) {

// Get returns the stored value against the key and when the key was last updated.
func (c *FifoCache) Get(ctx context.Context, key string) (interface{}, bool) {
span, ctx := ot.StartSpanFromContext(ctx, c.name+"-cache-get")
defer span.Finish()

c.totalGets.Inc()
if c.size == 0 {
return nil, false
Expand All @@ -217,17 +209,15 @@ func (c *FifoCache) Get(ctx context.Context, key string) (interface{}, bool) {
if ok {
updated := c.entries[index].updated
if time.Now().Sub(updated) < c.validity {
span.LogFields(otlog.Bool("hit", true))

return c.entries[index].value, true
}

c.totalMisses.Inc()
c.staleGets.Inc()
span.LogFields(otlog.Bool("hit", false), otlog.Bool("stale", true))
return nil, false
}

span.LogFields(otlog.Bool("hit", false), otlog.Bool("stale", false))
c.totalMisses.Inc()
return nil, false
}
39 changes: 26 additions & 13 deletions cassandra/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/common/model"

"github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/chunk/util"
)

const (
Expand Down Expand Up @@ -185,7 +186,11 @@ func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch)
return nil
}

func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error {
func (s *storageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error {
return util.DoParallelQueries(ctx, s.query, queries, callback)
}

func (s *storageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error {
var q *gocql.Query

switch {
Expand Down Expand Up @@ -218,7 +223,7 @@ func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery,
defer iter.Close()
scanner := iter.Scanner()
for scanner.Next() {
var b readBatch
b := &readBatch{}
if err := scanner.Scan(&b.rangeValue, &b.value); err != nil {
return errors.WithStack(err)
}
Expand All @@ -231,27 +236,35 @@ func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery,

// readBatch represents a batch of rows read from Cassandra.
type readBatch struct {
consumed bool
rangeValue []byte
value []byte
}

// Len implements chunk.ReadBatch; in Cassandra we 'stream' results back
// one-by-one, so this always returns 1.
func (readBatch) Len() int {
return 1
func (r *readBatch) Iterator() chunk.ReadBatchIterator {
return &readBatchIter{
readBatch: r,
}
}

type readBatchIter struct {
consumed bool
*readBatch
}

func (b readBatch) RangeValue(index int) []byte {
if index != 0 {
panic("index != 0")
func (b *readBatchIter) Next() bool {
if b.consumed {
return false
}
b.consumed = true
return true
}

func (b *readBatchIter) RangeValue() []byte {
return b.rangeValue
}

func (b readBatch) Value(index int) []byte {
if index != 0 {
panic("index != 0")
}
func (b *readBatchIter) Value() []byte {
return b.value
}

Expand Down
46 changes: 8 additions & 38 deletions chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,53 +347,23 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, from, through mode
}

func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) {
incomingEntries := make(chan []IndexEntry)
incomingErrors := make(chan error)
for _, query := range queries {
go func(query IndexQuery) {
entries, err := c.lookupEntriesByQuery(ctx, query)
if err != nil {
incomingErrors <- err
} else {
incomingEntries <- entries
}
}(query)
}

// Combine the results into one slice
var entries []IndexEntry
var lastErr error
for i := 0; i < len(queries); i++ {
select {
case incoming := <-incomingEntries:
entries = append(entries, incoming...)
case err := <-incomingErrors:
lastErr = err
}
}

return entries, lastErr
}

func (c *store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]IndexEntry, error) {
var entries []IndexEntry

if err := c.storage.QueryPages(ctx, query, func(resp ReadBatch) (shouldContinue bool) {
for i := 0; i < resp.Len(); i++ {
err := c.storage.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool {
iter := resp.Iterator()
for iter.Next() {
entries = append(entries, IndexEntry{
TableName: query.TableName,
HashValue: query.HashValue,
RangeValue: resp.RangeValue(i),
Value: resp.Value(i),
RangeValue: iter.RangeValue(),
Value: iter.Value(),
})
}
return true
}); err != nil {
})
if err != nil {
level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "error querying storage", "err", err)
return nil, err
}

return entries, nil
return entries, err
}

func (c *store) parseIndexEntries(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) {
Expand Down
Loading

0 comments on commit 311a376

Please sign in to comment.