Skip to content

Commit

Permalink
Bounded parallelism for DoParallelQueries (grafana#1053)
Browse files Browse the repository at this point in the history
* Bounded parallelism for DoParallelQueries

Limit the number of queries we will send in parallel to the back-end,
in case we have tens of thousands of series to fetch.

Signed-off-by: Bryan Boreham <bryan@weave.works>
  • Loading branch information
bboreham authored Oct 5, 2018
1 parent 07c5d7d commit 0a5f3fc
Showing 1 changed file with 33 additions and 6 deletions.
39 changes: 33 additions & 6 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"bytes"
"context"

ot "github.com/opentracing/opentracing-go"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/util"
)

// DoSingleQuery is the interface for indexes that don't support batching yet.
Expand All @@ -13,20 +16,44 @@ type DoSingleQuery func(
callback func(chunk.ReadBatch) bool,
) error

// QueryParallelism is the maximum number of subqueries run in
// parallel per higher-level query
var QueryParallelism = 100

// DoParallelQueries translates between our interface for query batching,
// and indexes that don't yet support batching.
func DoParallelQueries(
ctx context.Context, doSingleQuery DoSingleQuery, queries []chunk.IndexQuery,
callback func(chunk.IndexQuery, chunk.ReadBatch) bool,
) error {
queue := make(chan chunk.IndexQuery)
incomingErrors := make(chan error)
for _, query := range queries {
go func(query chunk.IndexQuery) {
incomingErrors <- doSingleQuery(ctx, query, func(r chunk.ReadBatch) bool {
return callback(query, r)
})
}(query)
n := util.Min(len(queries), QueryParallelism)
// Run n parallel goroutines fetching queries from the queue
for i := 0; i < n; i++ {
go func() {
sp, ctx := ot.StartSpanFromContext(ctx, "DoParallelQueries-worker")
defer sp.Finish()
for {
query, ok := <-queue
if !ok {
return
}
incomingErrors <- doSingleQuery(ctx, query, func(r chunk.ReadBatch) bool {
return callback(query, r)
})
}
}()
}
// Send all the queries into the queue
go func() {
for _, query := range queries {
queue <- query
}
close(queue)
}()

// Now receive all the results.
var lastErr error
for i := 0; i < len(queries); i++ {
err := <-incomingErrors
Expand Down

0 comments on commit 0a5f3fc

Please sign in to comment.