Skip to content

Commit

Permalink
Do not query blocks store for time ranges already covered by ingesters
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed May 29, 2020
1 parent 6fec92a commit c733124
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* [ENHANCEMENT] Experimental TSDB: Use shared cache for metadata. This is especially useful when running multiple querier and store-gateway components to reduce number of object store API calls. #2626
* [ENHANCEMENT] Upgrade Thanos to [f7802edbf830](https://github.com/thanos-io/thanos/commit/f7802edbf830) and Prometheus to [f4dd45609a05](https://github.com/prometheus/prometheus/commit/f4dd45609a05) which is after v2.18.1. #2634
* TSDB now does memory-mapping of Head chunks and reduces memory usage.
* [ENHANCEMENT] Experimental TSDB: when `-querier.query-store-after` is configured and running the experimental blocks storage, the time range of the query sent to the store is now manipulated to ensure the query end time is not more recent than 'now - query-store-after'. #2642
* [BUGFIX] Ruler: Ensure temporary rule files with special characters are properly mapped and cleaned up. #2506
* [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372
* [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400
Expand Down
5 changes: 4 additions & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,10 @@ The `querier_config` configures the Cortex querier.
[query_ingesters_within: <duration> | default = 0s]
# The time after which a metric should only be queried from storage and not just
# ingesters. 0 means all queries are sent to store.
# ingesters. 0 means all queries are sent to store. When running the
# experimental blocks storage, if this option is enabled, the time range of the
# query sent to the store will be manipulated to ensure the query end is not
# more recent than 'now - query-store-after'.
# CLI flag: -querier.query-store-after
[query_store_after: <duration> | default = 0s]
Expand Down
26 changes: 26 additions & 0 deletions pkg/querier/block_meta.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package querier

import (
"fmt"
"strings"
"time"

"github.com/oklog/ulid"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/pkg/util"
)

// BlockMeta is a struct extending the Thanos block metadata and adding
Expand All @@ -17,6 +21,28 @@ type BlockMeta struct {
UploadedAt time.Time
}

func (m BlockMeta) String() string {
minT := util.TimeFromMillis(m.MinTime).UTC()
maxT := util.TimeFromMillis(m.MaxTime).UTC()

return fmt.Sprintf("%s (min time: %s max time: %s)", m.ULID, minT.String(), maxT.String())
}

type BlockMetas []*BlockMeta

func (s BlockMetas) String() string {
b := strings.Builder{}

for idx, m := range s {
if idx > 0 {
b.WriteString(", ")
}
b.WriteString(m.String())
}

return b.String()
}

func getULIDsFromBlockMetas(metas []*BlockMeta) []ulid.ULID {
ids := make([]ulid.ULID, len(metas))
for i, m := range metas {
Expand Down
78 changes: 51 additions & 27 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"io"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -79,10 +80,11 @@ func (cfg *BlocksConsistencyCheckConfig) RegisterFlagsWithPrefix(prefix string,
type BlocksStoreQueryable struct {
services.Service

stores BlocksStoreSet
finder BlocksFinder
consistency *BlocksConsistencyChecker
logger log.Logger
stores BlocksStoreSet
finder BlocksFinder
consistency *BlocksConsistencyChecker
logger log.Logger
queryStoreAfter time.Duration

// Subservices manager.
subservices *services.Manager
Expand All @@ -92,7 +94,7 @@ type BlocksStoreQueryable struct {
storesHit prometheus.Histogram
}

func NewBlocksStoreQueryable(stores BlocksStoreSet, finder BlocksFinder, consistency *BlocksConsistencyChecker, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) {
func NewBlocksStoreQueryable(stores BlocksStoreSet, finder BlocksFinder, consistency *BlocksConsistencyChecker, queryStoreAfter time.Duration, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) {
util.WarnExperimentalUse("Blocks storage engine")

manager, err := services.NewManager(stores, finder)
Expand All @@ -104,6 +106,7 @@ func NewBlocksStoreQueryable(stores BlocksStoreSet, finder BlocksFinder, consist
stores: stores,
finder: finder,
consistency: consistency,
queryStoreAfter: queryStoreAfter,
logger: logger,
subservices: manager,
subservicesWatcher: services.NewFailureWatcher(),
Expand Down Expand Up @@ -180,7 +183,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
)
}

return NewBlocksStoreQueryable(stores, scanner, consistency, logger, reg)
return NewBlocksStoreQueryable(stores, scanner, consistency, querierCfg.QueryStoreAfter, logger, reg)
}

func (q *BlocksStoreQueryable) starting(ctx context.Context) error {
Expand Down Expand Up @@ -220,15 +223,16 @@ func (q *BlocksStoreQueryable) Querier(ctx context.Context, mint, maxt int64) (s
}

return &blocksStoreQuerier{
ctx: ctx,
minT: mint,
maxT: maxt,
userID: userID,
finder: q.finder,
stores: q.stores,
storesHit: q.storesHit,
consistency: q.consistency,
logger: q.logger,
ctx: ctx,
minT: mint,
maxT: maxt,
userID: userID,
finder: q.finder,
stores: q.stores,
storesHit: q.storesHit,
consistency: q.consistency,
logger: q.logger,
queryStoreAfter: q.queryStoreAfter,
}, nil
}

Expand All @@ -241,6 +245,10 @@ type blocksStoreQuerier struct {
storesHit prometheus.Histogram
consistency *BlocksConsistencyChecker
logger log.Logger

// If set, the querier manipulates the max time to not be greater than
// "now - queryStoreAfter" so that most recent blocks are not queried.
queryStoreAfter time.Duration
}

// Select implements storage.Querier interface.
Expand Down Expand Up @@ -279,29 +287,47 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
minT, maxT = sp.Start, sp.End
}

// If queryStoreAfter is enabled, we do manipulate the query maxt to query samples up until
// now - queryStoreAfter, because the most recent time range is covered by ingesters. This
// optimization is particularly important for the blocks storage because can be used to skip
// querying most recent not-compacted-yet blocks from the storage.
if q.queryStoreAfter > 0 {
now := time.Now()
origMaxT := maxT
maxT = util.Min64(maxT, util.TimeToMillis(now.Add(-q.queryStoreAfter)))

if origMaxT != maxT {
level.Debug(spanLog).Log("msg", "query max time has been manipulated", "original", origMaxT, "updated", maxT)
}

if maxT < minT {
q.storesHit.Observe(0)
level.Debug(spanLog).Log("msg", "empty query time range after max time manipulation")
return series.NewEmptySeriesSet(), nil, nil
}
}

// Find the list of blocks we need to query given the time range.
metas, deletionMarks, err := q.finder.GetBlocks(q.userID, minT, maxT)
if err != nil {
return nil, nil, err
}

if len(metas) == 0 {
if q.storesHit != nil {
q.storesHit.Observe(0)
}

q.storesHit.Observe(0)
level.Debug(spanLog).Log("msg", "no blocks found")
return series.NewEmptySeriesSet(), nil, nil
}

blockIDs := getULIDsFromBlockMetas(metas)
level.Debug(spanLog).Log("expected blocks", blockIDs)
level.Debug(spanLog).Log("msg", "found blocks to query", "expected", BlockMetas(metas).String())

// Find the set of store-gateway instances having the blocks.
blockIDs := getULIDsFromBlockMetas(metas)
clients, err := q.stores.GetClientsFor(blockIDs)
if err != nil {
return nil, nil, err
}
level.Debug(spanLog).Log("num store-gateway instances", len(clients))
level.Debug(spanLog).Log("msg", "found store-gateway instances to query", "num instances", len(clients))

req := &storepb.SeriesRequest{
MinTime: minT,
Expand Down Expand Up @@ -361,7 +387,7 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
}
}

level.Debug(spanLog).Log("store-gateway", c, "num received series", len(mySeries), "bytes received series", countSeriesBytes(mySeries))
level.Debug(spanLog).Log("msg", "received series from store-gateway", "instance", c, "num series", len(mySeries), "bytes series", countSeriesBytes(mySeries))

// Store the result.
mtx.Lock()
Expand All @@ -379,10 +405,8 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
return nil, nil, err
}

level.Debug(spanLog).Log("queried blocks", queriedBlocks)
if q.storesHit != nil {
q.storesHit.Observe(float64(len(clients)))
}
level.Debug(spanLog).Log("msg", "received series from all store-gateways", "queried blocks", queriedBlocks)
q.storesHit.Observe(float64(len(clients)))

// Ensure all expected blocks have been queried.
if q.consistency != nil {
Expand Down
99 changes: 89 additions & 10 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,29 @@ import (
"github.com/gogo/protobuf/types"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/store/hintspb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
)

func TestBlocksStoreQuerier_SelectSorted(t *testing.T) {
const (
metricName = "test_metric"
minT = 10
maxT = 20
minT = int64(10)
maxT = int64(20)
)

var (
Expand Down Expand Up @@ -255,10 +259,8 @@ func TestBlocksStoreQuerier_SelectSorted(t *testing.T) {
mockedResult: testData.storeSetResult,
mockedErr: testData.storeSetErr,
}
finder := &blocksFinderMock{
mockedResult: testData.finderResult,
mockedErr: testData.finderErr,
}
finder := &blocksFinderMock{}
finder.On("GetBlocks", "user-1", minT, maxT).Return(testData.finderResult, map[ulid.ULID]*metadata.DeletionMark(nil), testData.finderErr)

q := &blocksStoreQuerier{
ctx: ctx,
Expand All @@ -269,6 +271,7 @@ func TestBlocksStoreQuerier_SelectSorted(t *testing.T) {
stores: stores,
consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil),
logger: log.NewNopLogger(),
storesHit: prometheus.NewHistogram(prometheus.HistogramOpts{}),
}

matchers := []*labels.Matcher{
Expand Down Expand Up @@ -313,6 +316,83 @@ func TestBlocksStoreQuerier_SelectSorted(t *testing.T) {
}
}

func TestBlocksStoreQuerier_SelectSortedShouldHonorQueryStoreAfter(t *testing.T) {
now := time.Now()

tests := map[string]struct {
queryStoreAfter time.Duration
queryMinT int64
queryMaxT int64
expectedMinT int64
expectedMaxT int64
}{
"should not manipulate query time range if queryStoreAfter is disabled": {
queryStoreAfter: 0,
queryMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
queryMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
expectedMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
expectedMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
},
"should not manipulate query time range if queryStoreAfter is enabled but query max time is older": {
queryStoreAfter: time.Hour,
queryMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
queryMaxT: util.TimeToMillis(now.Add(-70 * time.Minute)),
expectedMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
expectedMaxT: util.TimeToMillis(now.Add(-70 * time.Minute)),
},
"should manipulate query time range if queryStoreAfter is enabled and query max time is recent": {
queryStoreAfter: time.Hour,
queryMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
queryMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
expectedMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
expectedMaxT: util.TimeToMillis(now.Add(-60 * time.Minute)),
},
"should skip the query if the query min time is more recent than queryStoreAfter": {
queryStoreAfter: time.Hour,
queryMinT: util.TimeToMillis(now.Add(-50 * time.Minute)),
queryMaxT: util.TimeToMillis(now.Add(-20 * time.Minute)),
expectedMinT: 0,
expectedMaxT: 0,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
finder := &blocksFinderMock{}
finder.On("GetBlocks", "user-1", mock.Anything, mock.Anything).Return([]*BlockMeta(nil), map[ulid.ULID]*metadata.DeletionMark(nil), error(nil))

q := &blocksStoreQuerier{
ctx: context.Background(),
minT: testData.queryMinT,
maxT: testData.queryMaxT,
userID: "user-1",
finder: finder,
stores: &blocksStoreSetMock{},
consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil),
logger: log.NewNopLogger(),
storesHit: prometheus.NewHistogram(prometheus.HistogramOpts{}),
queryStoreAfter: testData.queryStoreAfter,
}

sp := &storage.SelectHints{
Start: testData.queryMinT,
End: testData.queryMaxT,
}

_, _, err := q.selectSorted(sp, nil)
require.NoError(t, err)

if testData.expectedMinT == 0 && testData.expectedMaxT == 0 {
assert.Len(t, finder.Calls, 0)
} else {
require.Len(t, finder.Calls, 1)
assert.Equal(t, testData.expectedMinT, finder.Calls[0].Arguments.Get(1))
assert.InDelta(t, testData.expectedMaxT, finder.Calls[0].Arguments.Get(2), float64(5*time.Second.Milliseconds()))
}
})
}
}

type blocksStoreSetMock struct {
services.Service

Expand All @@ -326,13 +406,12 @@ func (m *blocksStoreSetMock) GetClientsFor(_ []ulid.ULID) ([]BlocksStoreClient,

type blocksFinderMock struct {
services.Service

mockedResult []*BlockMeta
mockedErr error
mock.Mock
}

func (m *blocksFinderMock) GetBlocks(userID string, minT, maxT int64) ([]*BlockMeta, map[ulid.ULID]*metadata.DeletionMark, error) {
return m.mockedResult, nil, m.mockedErr
args := m.Called(userID, minT, maxT)
return args.Get(0).([]*BlockMeta), args.Get(1).(map[ulid.ULID]*metadata.DeletionMark), args.Error(2)
}

type storeGatewayClientMock struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.")
f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.")
f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should only be queried from storage and not just ingesters. 0 means all queries are sent to store.")
f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should only be queried from storage and not just ingesters. 0 means all queries are sent to store. When running the experimental blocks storage, if this option is enabled, the time range of the query sent to the store will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.")
f.StringVar(&cfg.ActiveQueryTrackerDir, "querier.active-query-tracker-dir", "./active-query-tracker", "Active query tracker monitors active queries, and writes them to the file in given directory. If Cortex discovers any queries in this log during startup, it will log them to the log file. Setting to empty value disables active query tracker, which also disables -querier.max-concurrent option.")
f.StringVar(&cfg.StoreGatewayAddresses, "experimental.querier.store-gateway-addresses", "", "Comma separated list of store-gateway addresses in DNS Service Discovery format. This option should be set when using the experimental blocks storage and the store-gateway sharding is disabled (when enabled, the store-gateway instances form a ring and addresses are picked from the ring).")
f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", defaultLookbackDelta, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.")
Expand Down

0 comments on commit c733124

Please sign in to comment.