From f2208a9820c74f665d7706184c10fbc74f58df2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 3 May 2024 12:21:27 +0300 Subject: [PATCH 1/2] e2e/compact: add repro for issue #6775 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adding a minimal test case for issue #6775 - reproduces the panic in the compactor. Signed-off-by: Giedrius Statkevičius --- test/e2e/compact_test.go | 77 ++++++++++++++++++++++++++++++++++ test/e2e/e2ethanos/services.go | 30 +++++++++++++ 2 files changed, 107 insertions(+) diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index daa2fa615e..94a3f344fe 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -924,3 +924,80 @@ func TestCompactorDownsampleIgnoresMarked(t *testing.T) { testutil.NotOk(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_downsample_total"}, e2emon.WaitMissingMetrics())) } + +// TestCompactorIssue6775 tests that the compactor does not crash when +// compacting 5m downsampled blocks with some overlap. +func TestCompactorIssue6775(t *testing.T) { + const minTime = 1710374400014 + const maxTime = 1711584000000 + + logger := log.NewNopLogger() + e, err := e2e.NewDockerEnvironment("c-issue6775") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + dir := filepath.Join(e.SharedDir(), "tmp") + testutil.Ok(t, os.MkdirAll(dir, os.ModePerm)) + + const bucket = "compact-test" + m := e2edb.NewMinio(e, "minio", bucket, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(m)) + + bkt, err := s3.NewBucketWithConfig(logger, + e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()), "test-feed") + testutil.Ok(t, err) + + baseBlockDesc := blockDesc{ + series: []labels.Labels{ + labels.FromStrings("z", "1", "b", "2"), + labels.FromStrings("z", "1", "b", "5"), + }, + extLset: labels.FromStrings("case", "downsampled-block-with-overlap"), + mint: minTime, + maxt: maxTime, + } + + for i := 0; i < 2; i++ { + rawBlockID, err := baseBlockDesc.Create(context.Background(), dir, 0, metadata.NoneFunc, 1200+i) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadDir(context.Background(), logger, bkt, path.Join(dir, rawBlockID.String()), rawBlockID.String())) + } + + // Downsample them first. + bds := e2ethanos.NewToolsBucketDownsample(e, "downsample", client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), + }) + testutil.Ok(t, bds.Start()) + + // NOTE(GiedriusS): can't use WaitSumMetrics here because the e2e library doesn't + // work well with histograms. + testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stderr), 1*time.Second, make(<-chan struct{}), func() (rerr error) { + resp, err := http.Get(fmt.Sprintf("http://%s/metrics", bds.Endpoint("http"))) + if err != nil { + return fmt.Errorf("getting metrics: %w", err) + } + defer runutil.CloseWithErrCapture(&rerr, resp.Body, "close body") + + b, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("reading metrics: %w", err) + } + + if !bytes.Contains(b, []byte(`thanos_compact_downsample_duration_seconds_count{resolution="0"} 2`)) { + return fmt.Errorf("failed to find the right downsampling metric") + } + + return nil + })) + + testutil.Ok(t, bds.Stop()) + + // Run the compactor. + c := e2ethanos.NewCompactorBuilder(e, "working").Init(client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.Dir()), + }, nil, "--compact.enable-vertical-compaction") + testutil.NotOk(t, e2e.StartAndWaitReady(c)) + testutil.NotOk(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_iterations_total"}, e2emon.WaitMissingMetrics())) +} diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 07141a4ac7..8d2174b6c1 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -1217,3 +1217,33 @@ func NewRedis(e e2e.Environment, name string) e2e.Runnable { }, ) } + +func NewToolsBucketDownsample(e e2e.Environment, name string, bucketConfig client.BucketConfig) *e2eobs.Observable { + f := e.Runnable(fmt.Sprintf("downsampler-%s", name)). + WithPorts(map[string]int{"http": 8080}). + Future() + + bktConfigBytes, err := yaml.Marshal(bucketConfig) + if err != nil { + return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrapf(err, "generate store config file: %v", bucketConfig))} + } + + args := []string{"bucket", "downsample"} + + args = append(args, e2e.BuildArgs(map[string]string{ + "--http-address": ":8080", + "--log.level": "debug", + "--objstore.config": string(bktConfigBytes), + "--data-dir": f.InternalDir(), + })...) + + return e2eobs.AsObservable(f.Init( + e2e.StartOptions{ + Image: DefaultImage(), + Command: e2e.NewCommand("tools", args...), + User: strconv.Itoa(os.Getuid()), + Readiness: e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200), + WaitReadyBackoff: &defaultBackoffConfig, + }, + ), "http") +} From 6906f18c6c57c8240c629738514a07798d1b9032 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 3 May 2024 15:00:43 +0300 Subject: [PATCH 2/2] compact/planner: fix issue 6775 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It doesn't make sense to vertically compact downsampled blocks so mark them with the no compact marker if downsampled blocks were detected in the plan. Seems like the Planner is the best place for this logic - I just repeated the previous pattern with the large index file filter. Signed-off-by: Giedrius Statkevičius --- cmd/thanos/compact.go | 9 ++++- pkg/block/metadata/markers.go | 2 + pkg/compact/planner.go | 74 +++++++++++++++++++++++++++++++++-- test/e2e/compact_test.go | 6 +-- 4 files changed, 84 insertions(+), 7 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 8923bd376e..06e9a5d27a 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -369,13 +369,20 @@ func runCompact( conf.blockFilesConcurrency, conf.compactBlocksFetchConcurrency, ) + var planner compact.Planner + tsdbPlanner := compact.NewPlanner(logger, levels, noCompactMarkerFilter) - planner := compact.WithLargeTotalIndexSizeFilter( + largeIndexFilterPlanner := compact.WithLargeTotalIndexSizeFilter( tsdbPlanner, insBkt, int64(conf.maxBlockIndexSize), compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason), ) + if enableVerticalCompaction { + planner = compact.WithVerticalCompactionDownsampleFilter(largeIndexFilterPlanner, insBkt, compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.DownsampleVerticalCompactionNoCompactReason)) + } else { + planner = largeIndexFilterPlanner + } blocksCleaner := compact.NewBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures) compactor, err := compact.NewBucketCompactor( logger, diff --git a/pkg/block/metadata/markers.go b/pkg/block/metadata/markers.go index 83273eb343..0a351a5fab 100644 --- a/pkg/block/metadata/markers.go +++ b/pkg/block/metadata/markers.go @@ -79,6 +79,8 @@ const ( IndexSizeExceedingNoCompactReason = "index-size-exceeding" // OutOfOrderChunksNoCompactReason is a reason of to no compact block with index contains out of order chunk so that the compaction is not blocked. OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk" + // DownsampleVerticalCompactionNoCompactReason is a reason to not compact overlapping downsampled blocks as it does not make sense e.g. how to vertically compact the average. + DownsampleVerticalCompactionNoCompactReason = "downsample-vertical-compaction" ) // NoCompactMark marker stores reason of block being excluded from compaction if needed. diff --git a/pkg/compact/planner.go b/pkg/compact/planner.go index 783191cacf..6d7d03eea2 100644 --- a/pkg/compact/planner.go +++ b/pkg/compact/planner.go @@ -234,6 +234,67 @@ type largeTotalIndexSizeFilter struct { var _ Planner = &largeTotalIndexSizeFilter{} +type verticalCompactionDownsampleFilter struct { + bkt objstore.Bucket + markedForNoCompact prometheus.Counter + + *largeTotalIndexSizeFilter +} + +var _ Planner = &verticalCompactionDownsampleFilter{} + +func WithVerticalCompactionDownsampleFilter(with *largeTotalIndexSizeFilter, bkt objstore.Bucket, markedForNoCompact prometheus.Counter) Planner { + return &verticalCompactionDownsampleFilter{ + markedForNoCompact: markedForNoCompact, + bkt: bkt, + largeTotalIndexSizeFilter: with, + } +} + +func (v *verticalCompactionDownsampleFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) { + noCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, 0) +PlanLoop: + for { + plan, err := v.plan(ctx, noCompactMarked, metasByMinTime) + if err != nil { + return nil, err + } + + if len(selectOverlappingMetas(plan)) == 0 { + return plan, nil + } + + // If we have downsampled blocks, we need to mark them as no compact because it's impossible to do that with vertical compaction. + // Technically, the resolution is part of the group key but do not attach ourselves to that level of detail. + var marked = false + for _, m := range plan { + if m.Thanos.Downsample.Resolution == 0 { + continue + } + if err := block.MarkForNoCompact( + ctx, + v.logger, + v.bkt, + m.ULID, + metadata.DownsampleVerticalCompactionNoCompactReason, + "verticalCompactionDownsampleFilter: Downsampled block, see https://github.com/thanos-io/thanos/issues/6775", + v.markedForNoCompact, + ); err != nil { + return nil, errors.Wrapf(err, "mark %v for no compaction", m.ULID.String()) + } + noCompactMarked[m.ULID] = &metadata.NoCompactMark{ID: m.ULID, Version: metadata.NoCompactMarkVersion1} + marked = true + } + + if marked { + continue PlanLoop + } + + return plan, nil + + } +} + // WithLargeTotalIndexSizeFilter wraps Planner with largeTotalIndexSizeFilter that checks the given plans and estimates total index size. // When found, it marks block for no compaction by placing no-compact-mark.json and updating cache. // NOTE: The estimation is very rough as it assumes extreme cases of indexes sharing no bytes, thus summing all source index sizes. @@ -243,16 +304,19 @@ func WithLargeTotalIndexSizeFilter(with *tsdbBasedPlanner, bkt objstore.Bucket, return &largeTotalIndexSizeFilter{tsdbBasedPlanner: with, bkt: bkt, totalMaxIndexSizeBytes: totalMaxIndexSizeBytes, markedForNoCompact: markedForNoCompact} } -func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) { +func (t *largeTotalIndexSizeFilter) plan(ctx context.Context, extraNoCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { noCompactMarked := t.noCompBlocksFunc() - copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked)) + copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked)+len(extraNoCompactMarked)) for k, v := range noCompactMarked { copiedNoCompactMarked[k] = v } + for k, v := range extraNoCompactMarked { + copiedNoCompactMarked[k] = v + } PlanLoop: for { - plan, err := t.plan(copiedNoCompactMarked, metasByMinTime) + plan, err := t.tsdbBasedPlanner.plan(copiedNoCompactMarked, metasByMinTime) if err != nil { return nil, err } @@ -303,3 +367,7 @@ PlanLoop: return plan, nil } } + +func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) { + return t.plan(ctx, nil, metasByMinTime) +} diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index 94a3f344fe..d65c7e23ea 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -984,7 +984,7 @@ func TestCompactorIssue6775(t *testing.T) { return fmt.Errorf("reading metrics: %w", err) } - if !bytes.Contains(b, []byte(`thanos_compact_downsample_duration_seconds_count{resolution="0"} 2`)) { + if !bytes.Contains(b, []byte(`thanos_compact_downsample_duration_seconds_count{group="0@14846485652960182170"} 2`)) { return fmt.Errorf("failed to find the right downsampling metric") } @@ -998,6 +998,6 @@ func TestCompactorIssue6775(t *testing.T) { Type: client.S3, Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.Dir()), }, nil, "--compact.enable-vertical-compaction") - testutil.NotOk(t, e2e.StartAndWaitReady(c)) - testutil.NotOk(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_iterations_total"}, e2emon.WaitMissingMetrics())) + testutil.Ok(t, e2e.StartAndWaitReady(c)) + testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_iterations_total"}, e2emon.WaitMissingMetrics())) }