From 8b7a71bcd8d9fd4a3c5f3d3a5ae75a594316251d Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 12 Jul 2023 13:55:19 -0700 Subject: [PATCH 1/3] change shipper upload compacted type from bool to a function Signed-off-by: Ben Ye --- cmd/thanos/rule.go | 2 +- cmd/thanos/sidecar.go | 3 ++- pkg/receive/multitsdb.go | 2 +- pkg/shipper/shipper.go | 24 ++++++++++-------------- pkg/shipper/shipper_e2e_test.go | 9 ++++++--- pkg/shipper/shipper_test.go | 12 ++++++++---- 6 files changed, 28 insertions(+), 24 deletions(-) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index bc1f049de6..aa2aa73bf3 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -728,7 +728,7 @@ func runRule( } }() - s := shipper.New(logger, reg, conf.dataDir, bkt, func() labels.Labels { return conf.lset }, metadata.RulerSource, false, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc)) + s := shipper.New(logger, reg, conf.dataDir, bkt, func() labels.Labels { return conf.lset }, metadata.RulerSource, func() bool { return false }, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc)) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 4f8208ccde..116ae61888 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -341,8 +341,9 @@ func runSidecar( return errors.Wrapf(err, "aborting as no external labels found after waiting %s", promReadyTimeout) } + uploadCompactedFunc := func() bool { return conf.shipper.uploadCompacted } s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, - conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc)) + uploadCompactedFunc, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc)) return runutil.Repeat(30*time.Second, ctx.Done(), func() error { if uploaded, err := s.Sync(ctx); err != nil { diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 349ceb98ef..e541723500 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -600,7 +600,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant t.bucket, func() labels.Labels { return lset }, metadata.ReceiveSource, - false, + func() bool { return false }, t.allowOutOfOrderUpload, t.hashFunc, ) diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 2a2c04df2d..b07e62f206 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -40,7 +40,7 @@ type metrics struct { uploadedCompacted prometheus.Gauge } -func newMetrics(reg prometheus.Registerer, uploadCompacted bool) *metrics { +func newMetrics(reg prometheus.Registerer) *metrics { var m metrics m.dirSyncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{ @@ -59,15 +59,10 @@ func newMetrics(reg prometheus.Registerer, uploadCompacted bool) *metrics { Name: "thanos_shipper_upload_failures_total", Help: "Total number of block upload failures", }) - uploadCompactedGaugeOpts := prometheus.GaugeOpts{ + m.uploadedCompacted = promauto.With(reg).NewGauge(prometheus.GaugeOpts{ Name: "thanos_shipper_upload_compacted_done", Help: "If 1 it means shipper uploaded all compacted blocks from the filesystem.", - } - if uploadCompacted { - m.uploadedCompacted = promauto.With(reg).NewGauge(uploadCompactedGaugeOpts) - } else { - m.uploadedCompacted = promauto.With(nil).NewGauge(uploadCompactedGaugeOpts) - } + }) return &m } @@ -80,7 +75,7 @@ type Shipper struct { bucket objstore.Bucket source metadata.SourceType - uploadCompacted bool + uploadCompactedFunc func() bool allowOutOfOrderUploads bool hashFunc metadata.HashFunc @@ -98,7 +93,7 @@ func New( bucket objstore.Bucket, lbls func() labels.Labels, source metadata.SourceType, - uploadCompacted bool, + uploadCompactedFunc func() bool, allowOutOfOrderUploads bool, hashFunc metadata.HashFunc, ) *Shipper { @@ -114,10 +109,10 @@ func New( dir: dir, bucket: bucket, labels: lbls, - metrics: newMetrics(r, uploadCompacted), + metrics: newMetrics(r), source: source, allowOutOfOrderUploads: allowOutOfOrderUploads, - uploadCompacted: uploadCompacted, + uploadCompactedFunc: uploadCompactedFunc, hashFunc: hashFunc, } } @@ -272,6 +267,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { uploadErrs int ) + uploadCompacted := s.uploadCompactedFunc() metas, err := s.blockMetasFromOldest() if err != nil { return 0, err @@ -292,7 +288,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { // We only ship of the first compacted block level as normal flow. if m.Compaction.Level > 1 { - if !s.uploadCompacted { + if !uploadCompacted { continue } } @@ -339,7 +335,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { return uploaded, errors.Errorf("failed to sync %v blocks", uploadErrs) } - if s.uploadCompacted { + if uploadCompacted { s.metrics.uploadedCompacted.Set(1) } return uploaded, nil diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index 985ee4329f..5359742fa3 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -44,7 +44,8 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) { dir := t.TempDir() extLset := labels.FromStrings("prometheus", "prom-1") - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false, false, metadata.NoneFunc) + uploadCompactedFunc := func() bool { return false } + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, uploadCompactedFunc, false, metadata.NoneFunc) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -219,7 +220,8 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) { defer upcancel2() testutil.Ok(t, p.WaitPrometheusUp(upctx2, logger)) - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, false, metadata.NoneFunc) + uploadCompactedFunc := func() bool { return true } + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, uploadCompactedFunc, false, metadata.NoneFunc) // Create 10 new blocks. 9 of them (non compacted) should be actually uploaded. var ( @@ -374,8 +376,9 @@ func TestShipper_SyncOverlapBlocks_e2e(t *testing.T) { defer upcancel2() testutil.Ok(t, p.WaitPrometheusUp(upctx2, logger)) + uploadCompactedFunc := func() bool { return true } // Here, the allowOutOfOrderUploads flag is set to true, which allows blocks with overlaps to be uploaded. - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, true, metadata.NoneFunc) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, uploadCompactedFunc, true, metadata.NoneFunc) // Creating 2 overlapping blocks - both uploaded when OOO uploads allowed. var ( diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go index 75a37afcc0..99c92ea2be 100644 --- a/pkg/shipper/shipper_test.go +++ b/pkg/shipper/shipper_test.go @@ -29,7 +29,8 @@ import ( func TestShipperTimestamps(t *testing.T) { dir := t.TempDir() - s := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false, metadata.NoneFunc) + uploadCompactedFunc := func() bool { return false } + s := New(nil, nil, dir, nil, nil, metadata.TestSource, uploadCompactedFunc, false, metadata.NoneFunc) // Missing thanos meta file. _, _, err := s.Timestamps() @@ -122,7 +123,8 @@ func TestIterBlockMetas(t *testing.T) { }, }.WriteToDir(log.NewNopLogger(), path.Join(dir, id3.String()))) - shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false, metadata.NoneFunc) + uploadCompactedFunc := func() bool { return false } + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, uploadCompactedFunc, false, metadata.NoneFunc) metas, err := shipper.blockMetasFromOldest() testutil.Ok(t, err) testutil.Equals(t, sort.SliceIsSorted(metas, func(i, j int) bool { @@ -153,7 +155,8 @@ func BenchmarkIterBlockMetas(b *testing.B) { }) b.ResetTimer() - shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false, metadata.NoneFunc) + uploadCompactedFunc := func() bool { return false } + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, uploadCompactedFunc, false, metadata.NoneFunc) _, err := shipper.blockMetasFromOldest() testutil.Ok(b, err) @@ -165,7 +168,8 @@ func TestShipperAddsSegmentFiles(t *testing.T) { inmemory := objstore.NewInMemBucket() lbls := []labels.Label{{Name: "test", Value: "test"}} - s := New(nil, nil, dir, inmemory, func() labels.Labels { return lbls }, metadata.TestSource, false, false, metadata.NoneFunc) + uploadCompactedFunc := func() bool { return false } + s := New(nil, nil, dir, inmemory, func() labels.Labels { return lbls }, metadata.TestSource, uploadCompactedFunc, false, metadata.NoneFunc) id := ulid.MustNew(1, nil) blockDir := path.Join(dir, id.String()) From 6e4e067398ef2f722adac3f2749ac6cf3b252c14 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 12 Jul 2023 13:59:47 -0700 Subject: [PATCH 2/3] add default to false Signed-off-by: Ben Ye --- cmd/thanos/rule.go | 2 +- pkg/receive/multitsdb.go | 2 +- pkg/shipper/shipper.go | 5 +++++ pkg/shipper/shipper_e2e_test.go | 3 +-- pkg/shipper/shipper_test.go | 12 ++++-------- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index aa2aa73bf3..346ae52fec 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -728,7 +728,7 @@ func runRule( } }() - s := shipper.New(logger, reg, conf.dataDir, bkt, func() labels.Labels { return conf.lset }, metadata.RulerSource, func() bool { return false }, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc)) + s := shipper.New(logger, reg, conf.dataDir, bkt, func() labels.Labels { return conf.lset }, metadata.RulerSource, nil, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc)) ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index e541723500..2458e7123f 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -600,7 +600,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant t.bucket, func() labels.Labels { return lset }, metadata.ReceiveSource, - func() bool { return false }, + nil, t.allowOutOfOrderUpload, t.hashFunc, ) diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index b07e62f206..15bda5eaa1 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -104,6 +104,11 @@ func New( lbls = func() labels.Labels { return nil } } + if uploadCompactedFunc == nil { + uploadCompactedFunc = func() bool { + return false + } + } return &Shipper{ logger: logger, dir: dir, diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index 5359742fa3..5b95a2059b 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -44,8 +44,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) { dir := t.TempDir() extLset := labels.FromStrings("prometheus", "prom-1") - uploadCompactedFunc := func() bool { return false } - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, uploadCompactedFunc, false, metadata.NoneFunc) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, nil, false, metadata.NoneFunc) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go index 99c92ea2be..96ded414f9 100644 --- a/pkg/shipper/shipper_test.go +++ b/pkg/shipper/shipper_test.go @@ -29,8 +29,7 @@ import ( func TestShipperTimestamps(t *testing.T) { dir := t.TempDir() - uploadCompactedFunc := func() bool { return false } - s := New(nil, nil, dir, nil, nil, metadata.TestSource, uploadCompactedFunc, false, metadata.NoneFunc) + s := New(nil, nil, dir, nil, nil, metadata.TestSource, nil, false, metadata.NoneFunc) // Missing thanos meta file. _, _, err := s.Timestamps() @@ -123,8 +122,7 @@ func TestIterBlockMetas(t *testing.T) { }, }.WriteToDir(log.NewNopLogger(), path.Join(dir, id3.String()))) - uploadCompactedFunc := func() bool { return false } - shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, uploadCompactedFunc, false, metadata.NoneFunc) + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, nil, false, metadata.NoneFunc) metas, err := shipper.blockMetasFromOldest() testutil.Ok(t, err) testutil.Equals(t, sort.SliceIsSorted(metas, func(i, j int) bool { @@ -155,8 +153,7 @@ func BenchmarkIterBlockMetas(b *testing.B) { }) b.ResetTimer() - uploadCompactedFunc := func() bool { return false } - shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, uploadCompactedFunc, false, metadata.NoneFunc) + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, nil, false, metadata.NoneFunc) _, err := shipper.blockMetasFromOldest() testutil.Ok(b, err) @@ -168,8 +165,7 @@ func TestShipperAddsSegmentFiles(t *testing.T) { inmemory := objstore.NewInMemBucket() lbls := []labels.Label{{Name: "test", Value: "test"}} - uploadCompactedFunc := func() bool { return false } - s := New(nil, nil, dir, inmemory, func() labels.Labels { return lbls }, metadata.TestSource, uploadCompactedFunc, false, metadata.NoneFunc) + s := New(nil, nil, dir, inmemory, func() labels.Labels { return lbls }, metadata.TestSource, nil, false, metadata.NoneFunc) id := ulid.MustNew(1, nil) blockDir := path.Join(dir, id.String()) From 66aaa253d2c126a2f712e21d4c8b75bacc565d85 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 13 Jul 2023 00:07:59 -0700 Subject: [PATCH 3/3] reset uploadedCompacted to 0 Signed-off-by: Ben Ye --- pkg/shipper/shipper.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 15bda5eaa1..d3562d87b1 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -342,6 +342,8 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { if uploadCompacted { s.metrics.uploadedCompacted.Set(1) + } else { + s.metrics.uploadedCompacted.Set(0) } return uploaded, nil }