Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark posting group lazy if it has a lot of keys #7961

Merged
merged 8 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Added

- [#7907](https://github.com/thanos-io/thanos/pull/7907) Receive: Add `--receive.grpc-service-config` flag to configure gRPC service config for the receivers.
- [#7961](https://github.com/thanos-io/thanos/pull/7961) Store Gateway: Add `--store.posting-group-max-keys` flag to mark posting group as lazy if it exceeds number of keys limit. Added `thanos_bucket_store_lazy_expanded_posting_groups_total` for total number of lazy posting groups and corresponding reasons.

### Changed

Expand Down
5 changes: 5 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type storeConfig struct {
lazyIndexReaderEnabled bool
lazyIndexReaderIdleTimeout time.Duration
lazyExpandedPostingsEnabled bool
PostingGroupMaxKeys int

indexHeaderLazyDownloadStrategy string
}
Expand Down Expand Up @@ -204,6 +205,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.").
Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled)

cmd.Flag("store.posting-group-max-keys", "Mark posting group as lazy if it fetches more keys than the configured number. Only valid if lazy expanded posting is enabled. 0 disables the limit.").
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How could the user know what the appropriate number here should be? Could we calculate & use some pre-defined threshold?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question... For us, we actually added some additional logs in our fork to log cardinalities for those queries that triggered lazy posting optimization. So that we have insights about the size. But logging those information in Thanos might be too expensive.

Do you think it would help if we introduce a native histogram to track posting group total posting sizes and number of keys?

I thought about making it a percentage value based on your block size. For example, 1% of your number of series, or 10% of your total label pairs but I couldn't find a good way to define that. Percentage will make the threshold very small for small blocks even though they don't cause any impact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GiedriusS One idea we had is to mark lazy if number of keys is much larger than the max possible series for the query (or smallest cardinality posting group).

number of keys / max_series_to_match > R

For a query, if the matcher with the smallest posting could match 1000 series. Then it can have at most 1000 different values for a single label. If the current posting group matches 100K keys it means we match at most 1K values and throw away 99K keys. Do you think this is a good metric? We can start with threshold like 100 and tune it accordingly

Default("0").IntVar(&sc.PostingGroupMaxKeys)

cmd.Flag("store.index-header-lazy-download-strategy", "Strategy of how to download index headers lazily. Supported values: eager, lazy. If eager, always download index header during initial load. If lazy, download index header during query time.").
Default(string(indexheader.EagerDownloadStrategy)).
EnumVar(&sc.indexHeaderLazyDownloadStrategy, string(indexheader.EagerDownloadStrategy), string(indexheader.LazyDownloadStrategy))
Expand Down Expand Up @@ -429,6 +433,7 @@ func runStore(
return conf.estimatedMaxChunkSize
}),
store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled),
store.WithPostingGroupMaxKeys(conf.PostingGroupMaxKeys),
store.WithIndexHeaderLazyDownloadStrategy(
indexheader.IndexHeaderLazyDownloadStrategy(conf.indexHeaderLazyDownloadStrategy).StrategyToDownloadFunc(),
),
Expand Down
5 changes: 5 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ Flags:
The maximum series allowed for a single Series
request. The Series call fails if this limit is
exceeded. 0 means no limit.
--store.posting-group-max-keys=0
Mark posting group as lazy if it fetches more
keys than the configured number. Only valid if
lazy expanded posting is enabled. 0 disables
the limit.
--sync-block-duration=15m Repeat interval for syncing the blocks between
local and remote view.
--tracing.config=<content>
Expand Down
59 changes: 48 additions & 11 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ type bucketStoreMetrics struct {
emptyPostingCount *prometheus.CounterVec

lazyExpandedPostingsCount prometheus.Counter
lazyExpandedPostingGroupsByReason *prometheus.CounterVec
lazyExpandedPostingSizeBytes prometheus.Counter
lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter

Expand Down Expand Up @@ -345,6 +346,11 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Help: "Total number of times when lazy expanded posting optimization applies.",
})

m.lazyExpandedPostingGroupsByReason = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_bucket_store_lazy_expanded_posting_groups_total",
Help: "Total number of posting groups that are marked as lazy and corresponding reason",
}, []string{"reason"})

m.lazyExpandedPostingSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_lazy_expanded_posting_size_bytes_total",
Help: "Total number of lazy posting group size in bytes.",
Expand Down Expand Up @@ -420,6 +426,7 @@ type BucketStore struct {
enableChunkHashCalculation bool

enabledLazyExpandedPostings bool
postingGroupMaxKeys int

sortingStrategy sortingStrategy

Expand Down Expand Up @@ -552,6 +559,13 @@ func WithLazyExpandedPostings(enabled bool) BucketStoreOption {
}
}

// WithPostingGroupMaxKeys configures a threshold to mark a posting group as lazy if it has more add keys.
func WithPostingGroupMaxKeys(postingGroupMaxKeys int) BucketStoreOption {
return func(s *BucketStore) {
s.postingGroupMaxKeys = postingGroupMaxKeys
}
}

// WithDontResort disables series resorting in Store Gateway.
func WithDontResort(true bool) BucketStoreOption {
return func(s *BucketStore) {
Expand Down Expand Up @@ -1002,8 +1016,11 @@ type blockSeriesClient struct {
chunksLimiter ChunksLimiter
bytesLimiter BytesLimiter

lazyExpandedPostingEnabled bool
lazyExpandedPostingEnabled bool
// Mark posting group as lazy if it adds too many keys. 0 to disable.
postingGroupMaxKeys int
lazyExpandedPostingsCount prometheus.Counter
lazyExpandedPostingGroupByReason *prometheus.CounterVec
lazyExpandedPostingSizeBytes prometheus.Counter
lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter

Expand Down Expand Up @@ -1046,7 +1063,9 @@ func newBlockSeriesClient(
chunkFetchDurationSum *prometheus.HistogramVec,
extLsetToRemove map[string]struct{},
lazyExpandedPostingEnabled bool,
postingGroupMaxKeys int,
lazyExpandedPostingsCount prometheus.Counter,
lazyExpandedPostingByReason *prometheus.CounterVec,
lazyExpandedPostingSizeBytes prometheus.Counter,
lazyExpandedPostingSeriesOverfetchedSizeBytes prometheus.Counter,
tenant string,
Expand Down Expand Up @@ -1081,7 +1100,9 @@ func newBlockSeriesClient(
chunkFetchDurationSum: chunkFetchDurationSum,

lazyExpandedPostingEnabled: lazyExpandedPostingEnabled,
postingGroupMaxKeys: postingGroupMaxKeys,
lazyExpandedPostingsCount: lazyExpandedPostingsCount,
lazyExpandedPostingGroupByReason: lazyExpandedPostingByReason,
lazyExpandedPostingSizeBytes: lazyExpandedPostingSizeBytes,
lazyExpandedPostingSeriesOverfetchedSizeBytes: lazyExpandedPostingSeriesOverfetchedSizeBytes,

Expand Down Expand Up @@ -1133,7 +1154,7 @@ func (b *blockSeriesClient) ExpandPostings(
matchers sortedMatchers,
seriesLimiter SeriesLimiter,
) error {
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.lazyExpandedPostingSizeBytes, b.tenant)
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.postingGroupMaxKeys, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant)
if err != nil {
return errors.Wrap(err, "expanded matching posting")
}
Expand Down Expand Up @@ -1566,7 +1587,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
s.metrics.chunkFetchDurationSum,
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.postingGroupMaxKeys,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
s.metrics.lazyExpandedPostingSizeBytes,
s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes,
tenant,
Expand Down Expand Up @@ -1880,7 +1903,9 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
nil,
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.postingGroupMaxKeys,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
s.metrics.lazyExpandedPostingSizeBytes,
s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes,
tenant,
Expand Down Expand Up @@ -2106,7 +2131,9 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
nil,
nil,
s.enabledLazyExpandedPostings,
s.postingGroupMaxKeys,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
s.metrics.lazyExpandedPostingSizeBytes,
s.metrics.lazyExpandedPostingSeriesOverfetchedSizeBytes,
tenant,
Expand Down Expand Up @@ -2563,7 +2590,16 @@ func (r *bucketIndexReader) reset(size int) {
// Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter, tenant string) (*lazyExpandedPostings, error) {
func (r *bucketIndexReader) ExpandedPostings(
ctx context.Context,
ms sortedMatchers,
bytesLimiter BytesLimiter,
lazyExpandedPostingEnabled bool,
postingGroupMaxKeys int,
lazyExpandedPostingSizeBytes prometheus.Counter,
lazyExpandedPostingGroupsByReason *prometheus.CounterVec,
tenant string,
) (*lazyExpandedPostings, error) {
// Shortcut the case of `len(postingGroups) == 0`. It will only happen when no
// matchers specified, and we don't need to fetch expanded postings from cache.
if len(ms) == 0 {
Expand Down Expand Up @@ -2615,7 +2651,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch
postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil))
}

ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, lazyExpandedPostingSizeBytes, tenant)
ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, postingGroupMaxKeys, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant)
if err != nil {
return nil, errors.Wrap(err, "fetch and expand postings")
}
Expand Down Expand Up @@ -2661,13 +2697,14 @@ func ExpandPostingsWithContext(ctx context.Context, p index.Postings) ([]storage
// If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels
// This computation happens in ExpandedPostings.
type postingGroup struct {
addAll bool
name string
matchers []*labels.Matcher
addKeys []string
removeKeys []string
cardinality int64
lazy bool
addAll bool
name string
matchers []*labels.Matcher
addKeys []string
removeKeys []string
cardinality int64
existentKeys int
lazy bool
}

func newPostingGroup(addAll bool, name string, addKeys, removeKeys []string) *postingGroup {
Expand Down
30 changes: 21 additions & 9 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1288,7 +1288,9 @@ func benchmarkExpandedPostings(
{`uniq=~"9|random-shuffled-values|1"`, []*labels.Matcher{iRegexBigValueSet}, bigValueSetSize},
}

dummyCounter := promauto.NewCounter(prometheus.CounterOpts{Name: "test"})
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})
for _, c := range cases {
t.Run(c.name, func(t testutil.TB) {
b := &bucketBlock{
Expand All @@ -1304,7 +1306,7 @@ func benchmarkExpandedPostings(

t.ResetTimer()
for i := 0; i < t.N(); i++ {
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant)
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
testutil.Equals(t, c.expectedLen, len(p.postings))
}
Expand Down Expand Up @@ -1340,8 +1342,10 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) {
// Match nothing.
matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*")
ctx := context.Background()
dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant)
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
testutil.Equals(t, ps, (*lazyExpandedPostings)(nil))
// Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings.
Expand Down Expand Up @@ -1378,8 +1382,10 @@ func TestLazyExpandedPostingsEmptyPostings(t *testing.T) {
matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "n", "1_.*")
matcher3 := labels.MustNewMatcher(labels.MatchRegexp, "i", ".+")
ctx := context.Background()
dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, dummyCounter, tenancy.DefaultTenant)
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
// We expect emptyLazyPostings rather than lazy postings with 0 length but with matchers.
testutil.Equals(t, ps, emptyLazyPostings)
Expand Down Expand Up @@ -2872,7 +2878,9 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
wg := sync.WaitGroup{}
wg.Add(concurrency)

dummyCounter := promauto.NewCounter(prometheus.CounterOpts{Name: "test"})
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})
for w := 0; w < concurrency; w++ {
go func() {
defer wg.Done()
Expand Down Expand Up @@ -2917,7 +2925,9 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
dummyHistogram,
nil,
false,
0,
dummyCounter,
dummyCounterVec,
dummyCounter,
dummyCounter,
tenancy.DefaultTenant,
Expand Down Expand Up @@ -3551,7 +3561,9 @@ func TestExpandedPostingsRace(t *testing.T) {

l := sync.Mutex{}
previousRefs := make(map[int][]storage.SeriesRef)
dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"})
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})

for {
if tm.Err() != nil {
Expand All @@ -3573,7 +3585,7 @@ func TestExpandedPostingsRace(t *testing.T) {
wg.Add(1)

go func(i int, bb *bucketBlock) {
refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant)
refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
defer wg.Done()

Expand Down
Loading
Loading