Skip to content

Commit

Permalink
Receiver: cache matchers for series calls (#7353)
Browse files Browse the repository at this point in the history
* Receiver: cache matchers for series calls

We have tried caching matchers before with a time-based expiration cache, this time we are trying with LRU cache.

We saw some of our receivers busy with compiling regexes and with high CPU usage, similar to the profile of the benchmark I added here:

* Adding matcher cache for method `MatchersToPromMatchers` and a new version which uses the cache.
* The main change is in `matchesExternalLabels` function which now receives a cache instance.

adding matcher cache and refactor matchers

Co-authored-by: Andre Branchizio <andre.branchizio@shopify.com>

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

Using the cache in proxy and tsdb stores (only receiver)

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

fixing problem with deep equality

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

adding some docs

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

Adding benchmark

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

undo unecessary changes

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

Adjusting metric names

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

adding changelog

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

wiring changes to the receiver

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

Fixing linting

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

docs

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

* using singleflight to get or set items

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

* improve metrics

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

* Introduce interface for matchers cache

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

* fixing unit test

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

* adding changelog

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

* fixing benchmark

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

* moving matcher cache to storecache package

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

* Trying to make the cache more reusable introducing interface

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

Fixing problem with wrong initialization

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

Moving interface to storecache package

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

remove empty file and fix calls to constructor passing nil;

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Fix false entry on change log

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

* Removing default value for registry and rename test file

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

* Using fmt.Errf()

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

* Remove method that is not on interface anymore

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

* Remove duplicate get call

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

---------

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>
Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>
  • Loading branch information
pedro-stanaka authored Jan 3, 2025
1 parent ca40906 commit 626d0e5
Show file tree
Hide file tree
Showing 17 changed files with 478 additions and 85 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7907](https://github.com/thanos-io/thanos/pull/7907) Receive: Add `--receive.grpc-service-config` flag to configure gRPC service config for the receivers.
- [#7961](https://github.com/thanos-io/thanos/pull/7961) Store Gateway: Add `--store.posting-group-max-keys` flag to mark posting group as lazy if it exceeds number of keys limit. Added `thanos_bucket_store_lazy_expanded_posting_groups_total` for total number of lazy posting groups and corresponding reasons.
- [#8000](https://github.com/thanos-io/thanos/pull/8000) Query: Bump promql-engine, pass partial response through options
- [#7353](https://github.com/thanos-io/thanos/pull/7353) Receiver: introduce optional cache for matchers in series calls.

### Changed

Expand Down
20 changes: 17 additions & 3 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ import (
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/wlog"
"google.golang.org/grpc"
"gopkg.in/yaml.v2"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"
"google.golang.org/grpc"
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
Expand All @@ -50,6 +49,7 @@ import (
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tls"
Expand Down Expand Up @@ -225,6 +225,15 @@ func runReceive(
return errors.Wrap(err, "parse relabel configuration")
}

var cache = storecache.NewNoopMatcherCache()
if conf.matcherCacheSize > 0 {
cache, err = storecache.NewMatchersCache(storecache.WithSize(conf.matcherCacheSize), storecache.WithPromRegistry(reg))
if err != nil {
return errors.Wrap(err, "failed to create matchers cache")
}
multiTSDBOptions = append(multiTSDBOptions, receive.WithMatchersCache(cache))
}

dbs := receive.NewMultiTSDB(
conf.dataDir,
logger,
Expand Down Expand Up @@ -345,6 +354,7 @@ func runReceive(

options := []store.ProxyStoreOption{
store.WithProxyStoreDebugLogging(debugLogging),
store.WithMatcherCache(cache),
store.WithoutDedup(),
}

Expand Down Expand Up @@ -893,6 +903,8 @@ type receiveConfig struct {

asyncForwardWorkerCount uint

matcherCacheSize int

featureList *[]string

headExpandedPostingsCacheSize uint64
Expand Down Expand Up @@ -1046,6 +1058,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
"about order.").
Default("false").Hidden().BoolVar(&rc.allowOutOfOrderUpload)

cmd.Flag("matcher-cache-size", "The size of the cache used for matching against external labels. Using 0 disables caching.").Default("0").IntVar(&rc.matcherCacheSize)

rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)

rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden())
Expand Down
2 changes: 2 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ Flags:
--log.format=logfmt Log format to use. Possible options: logfmt or
json.
--log.level=info Log filtering level.
--matcher-cache-size=0 The size of the cache used for matching against
external labels. Using 0 disables caching.
--objstore.config=<content>
Alternative to 'objstore.config-file'
flag (mutually exclusive). Content of
Expand Down
6 changes: 5 additions & 1 deletion pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/storepb"
storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil"
"github.com/thanos-io/thanos/pkg/testutil/custom"
Expand Down Expand Up @@ -54,6 +56,8 @@ func TestQuerier_Proxy(t *testing.T) {
files, err := filepath.Glob("testdata/promql/**/*.test")
testutil.Ok(t, err)
testutil.Equals(t, 10, len(files), "%v", files)
cache, err := storecache.NewMatchersCache()
testutil.Ok(t, err)

logger := log.NewLogfmtLogger(os.Stderr)
t.Run("proxy", func(t *testing.T) {
Expand All @@ -62,7 +66,7 @@ func TestQuerier_Proxy(t *testing.T) {
logger,
nil,
store.NewProxyStore(logger, nil, func() []store.Client { return sc.get() },
component.Debug, nil, 5*time.Minute, store.EagerRetrieval),
component.Debug, nil, 5*time.Minute, store.EagerRetrieval, store.WithMatcherCache(cache)),
1000000,
5*time.Minute,
)
Expand Down
3 changes: 1 addition & 2 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package receive
import (
"bytes"
"context"
goerrors "errors"
"fmt"
"io"
"math"
Expand All @@ -24,8 +25,6 @@ import (

"gopkg.in/yaml.v3"

goerrors "errors"

"github.com/alecthomas/units"
"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
Expand Down
23 changes: 17 additions & 6 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ import (
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/objstore"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand All @@ -39,6 +37,7 @@ import (
"github.com/thanos-io/thanos/pkg/receive/expandedpostingscache"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
)
Expand All @@ -64,6 +63,8 @@ type MultiTSDB struct {
hashFunc metadata.HashFunc
hashringConfigs []HashringConfig

matcherCache storecache.MatchersCache

tsdbClients []store.Client
exemplarClients map[string]*exemplars.TSDB

Expand Down Expand Up @@ -95,6 +96,12 @@ func WithBlockExpandedPostingsCacheSize(size uint64) MultiTSDBOption {
}
}

func WithMatchersCache(cache storecache.MatchersCache) MultiTSDBOption {
return func(s *MultiTSDB) {
s.matcherCache = cache
}
}

// NewMultiTSDB creates new MultiTSDB.
// NOTE: Passed labels must be sorted lexicographically (alphabetically).
func NewMultiTSDB(
Expand Down Expand Up @@ -127,6 +134,7 @@ func NewMultiTSDB(
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
matcherCache: storecache.NewNoopMatcherCache(),
}

for _, option := range options {
Expand Down Expand Up @@ -755,10 +763,13 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
shipper.DefaultMetaFilename,
)
}
options := []store.TSDBStoreOption{}
var options []store.TSDBStoreOption
if t.metricNameFilterEnabled {
options = append(options, store.WithCuckooMetricNameStoreFilter())
}
if t.matcherCache != nil {
options = append(options, store.WithMatcherCacheInstance(t.matcherCache))
}
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset))
t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil
level.Info(logger).Log("msg", "TSDB is now ready")
Expand Down
42 changes: 14 additions & 28 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,14 @@ func TestMultiTSDB(t *testing.T) {

logger := log.NewLogfmtLogger(os.Stderr)
t.Run("run fresh", func(t *testing.T) {
m := NewMultiTSDB(
dir, logger, prometheus.NewRegistry(), &tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
NoLockfile: true,
MaxExemplars: 100,
EnableExemplarStorage: true,
},
labels.FromStrings("replica", "01"),
"tenant_id",
nil,
false,
metadata.NoneFunc,
)
m := NewMultiTSDB(dir, logger, prometheus.NewRegistry(), &tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
NoLockfile: true,
MaxExemplars: 100,
EnableExemplarStorage: true,
}, labels.FromStrings("replica", "01"), "tenant_id", nil, false, metadata.NoneFunc)
defer func() { testutil.Ok(t, m.Close()) }()

testutil.Ok(t, m.Flush())
Expand Down Expand Up @@ -175,19 +168,12 @@ func TestMultiTSDB(t *testing.T) {

t.Run("flush with one sample produces a block", func(t *testing.T) {
const testTenant = "test_tenant"
m := NewMultiTSDB(
dir, logger, prometheus.NewRegistry(), &tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
NoLockfile: true,
},
labels.FromStrings("replica", "01"),
"tenant_id",
nil,
false,
metadata.NoneFunc,
)
m := NewMultiTSDB(dir, logger, prometheus.NewRegistry(), &tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
NoLockfile: true,
}, labels.FromStrings("replica", "01"), "tenant_id", nil, false, metadata.NoneFunc)
defer func() { testutil.Ok(t, m.Close()) }()

testutil.Ok(t, m.Flush())
Expand Down
5 changes: 2 additions & 3 deletions pkg/receive/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ import (
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"

"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down
3 changes: 1 addition & 2 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"testing"
"time"

"github.com/thanos-io/thanos/pkg/receive/writecapnp"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
Expand All @@ -24,6 +22,7 @@ import (
"github.com/prometheus/prometheus/tsdb/tsdbutil"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/receive/writecapnp"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
Expand Down
Loading

0 comments on commit 626d0e5

Please sign in to comment.