Skip to content

Commit

Permalink
Merge pull request #128 from vinted/syncblocks_hotfix
Browse files Browse the repository at this point in the history
store: lock around iterating over s.blocks
  • Loading branch information
GiedriusS authored Feb 3, 2025
2 parents 26e1654 + cdd155d commit dca52a7
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 48 deletions.
73 changes: 41 additions & 32 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
go func() {
for meta := range blockc {
if err := s.addBlock(ctx, meta); err != nil {
level.Warn(s.logger).Log("msg", "adding block failed", "err", err, "id", meta.ULID.String())
continue
}
}
Expand All @@ -694,17 +695,32 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
return metaFetchErr
}

var cleanupBlocks []*bucketBlock
s.mtx.RLock()
keys := make([]ulid.ULID, 0, len(s.blocks))
for k := range s.blocks {
keys = append(keys, k)
}
s.mtx.RUnlock()

// Drop all blocks that are no longer present in the bucket.
for id := range s.blocks {
for _, id := range keys {
if _, ok := metas[id]; ok {
continue
}
if err := s.removeBlock(id); err != nil {
level.Warn(s.logger).Log("msg", "drop of outdated block failed", "block", id, "err", err)
s.metrics.blockDropFailures.Inc()
}
level.Info(s.logger).Log("msg", "dropped outdated block", "block", id)

s.mtx.Lock()
b := s.blocks[id]
lset := labels.FromMap(b.meta.Thanos.Labels)
s.blockSets[lset.Hash()].remove(id)
delete(s.blocks, id)
s.mtx.Unlock()

s.metrics.blocksLoaded.Dec()
s.metrics.blockDrops.Inc()
cleanupBlocks = append(cleanupBlocks, b)

level.Info(s.logger).Log("msg", "dropped outdated block", "block", id)
}

// Sync advertise labels.
Expand All @@ -717,6 +733,25 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
return strings.Compare(s.advLabelSets[i].String(), s.advLabelSets[j].String()) < 0
})
s.mtx.Unlock()

go func() {
for _, b := range cleanupBlocks {
var errs prometheus.MultiError

errs.Append(b.Close())

if b.dir != "" {
errs.Append(os.RemoveAll(b.dir))
}

if len(errs) == 0 {
return
}

level.Warn(s.logger).Log("msg", "close of outdated block failed", "block", b.meta.ULID.String(), "err", errs.Error())
s.metrics.blockDropFailures.Inc()
}
}()
return nil
}

Expand Down Expand Up @@ -849,32 +884,6 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
return nil
}

func (s *BucketStore) removeBlock(id ulid.ULID) error {
s.mtx.Lock()
b, ok := s.blocks[id]
if ok {
lset := labels.FromMap(b.meta.Thanos.Labels)
s.blockSets[lset.Hash()].remove(id)
delete(s.blocks, id)
}
s.mtx.Unlock()

if !ok {
return nil
}

s.metrics.blocksLoaded.Dec()
if err := b.Close(); err != nil {
return errors.Wrap(err, "close block")
}

if b.dir == "" {
return nil
}

return os.RemoveAll(b.dir)
}

// TimeRange returns the minimum and maximum timestamp of data available in the store.
func (s *BucketStore) TimeRange() (mint, maxt int64) {
s.mtx.RLock()
Expand Down
5 changes: 4 additions & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1780,7 +1780,10 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
testutil.Equals(t, numSeries, len(srv.SeriesSet))
})
t.Run("remove second block. Cache stays. Ask for first again.", func(t *testing.T) {
testutil.Ok(t, store.removeBlock(b2.meta.ULID))
b, _ := store.blocks[b2.meta.ULID]
lset := labels.FromMap(b.meta.Thanos.Labels)
store.blockSets[lset.Hash()].remove(b2.meta.ULID)
delete(store.blocks, b2.meta.ULID)

srv := newStoreSeriesServer(context.Background())
testutil.Ok(t, store.Series(&storepb.SeriesRequest{
Expand Down
41 changes: 26 additions & 15 deletions test/e2e/store_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/cortexproject/promqlsmith"
"github.com/efficientgo/core/backoff"
"github.com/efficientgo/core/testutil"
"github.com/efficientgo/e2e"
e2edb "github.com/efficientgo/e2e/db"
Expand Down Expand Up @@ -816,7 +817,13 @@ metafile_content_ttl: 0s`
// thanos_blocks_meta_synced: 1x loadedMeta 0x labelExcludedMeta 0x TooFreshMeta.
for _, st := range []*e2eobs.Observable{store1, store2, store3} {
t.Run(st.Name(), func(t *testing.T) {
testutil.Ok(t, st.WaitSumMetrics(e2emon.Equals(1), "thanos_blocks_meta_synced"))
testutil.Ok(t, st.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_blocks_meta_synced"}, e2emon.WaitMissingMetrics(), e2emon.WithWaitBackoff(
&backoff.Config{
Min: 1 * time.Second,
Max: 10 * time.Second,
MaxRetries: 30,
},
)))
testutil.Ok(t, st.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total"))

testutil.Ok(t, st.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_blocks_loaded"))
Expand All @@ -826,23 +833,27 @@ metafile_content_ttl: 0s`
}

t.Run("query with groupcache loading from object storage", func(t *testing.T) {
queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery },
time.Now, promclient.QueryOptions{
Deduplicate: false,
},
[]model.Metric{
{
"a": "1",
"b": "2",
"ext1": "value1",
"replica": "1",
for i := 0; i < 3; i++ {
queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery },
time.Now, promclient.QueryOptions{
Deduplicate: false,
},
},
)
[]model.Metric{
{
"a": "1",
"b": "2",
"ext1": "value1",
"replica": "1",
},
},
)
}

for _, st := range []*e2eobs.Observable{store1, store2, store3} {
testutil.Ok(t, st.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{`thanos_cache_groupcache_loads_total`}))
testutil.Ok(t, st.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{`thanos_store_bucket_cache_operation_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "config", "chunks"))))
t.Run(st.Name(), func(t *testing.T) {
testutil.Ok(t, st.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{`thanos_cache_groupcache_loads_total`}))
testutil.Ok(t, st.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{`thanos_store_bucket_cache_operation_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "config", "chunks"))))
})
}
})

Expand Down

0 comments on commit dca52a7

Please sign in to comment.