From 7cb350b4b91c4894a9cd61a815c66bbea4c0a098 Mon Sep 17 00:00:00 2001 From: shuaizhang Date: Thu, 20 Jun 2019 14:26:50 -0700 Subject: [PATCH] add dedup function in compactor --- cmd/thanos/compact.go | 20 + cmd/thanos/dedup.go | 66 +++ cmd/thanos/main.go | 1 + pkg/compact/compact.go | 18 +- pkg/compact/compact_test.go | 8 +- pkg/compact/dedup/dedup.go | 73 +++ pkg/compact/dedup/dedup_test.go | 153 ++++++ pkg/compact/dedup/merger.go | 486 ++++++++++++++++++ pkg/compact/dedup/merger_test.go | 119 +++++ pkg/compact/dedup/metrics.go | 85 +++ pkg/compact/dedup/reader.go | 212 ++++++++ pkg/compact/dedup/reader_test.go | 71 +++ pkg/compact/dedup/replica.go | 246 +++++++++ pkg/compact/dedup/replica_test.go | 137 +++++ pkg/compact/downsample/downsample.go | 7 +- .../downsample/streamed_block_writer.go | 13 +- pkg/query/iter.go | 4 +- pkg/query/querier_test.go | 4 +- 18 files changed, 1695 insertions(+), 28 deletions(-) create mode 100644 cmd/thanos/dedup.go create mode 100644 pkg/compact/dedup/dedup.go create mode 100644 pkg/compact/dedup/dedup_test.go create mode 100644 pkg/compact/dedup/merger.go create mode 100644 pkg/compact/dedup/merger_test.go create mode 100644 pkg/compact/dedup/metrics.go create mode 100644 pkg/compact/dedup/reader.go create mode 100644 pkg/compact/dedup/reader_test.go create mode 100644 pkg/compact/dedup/replica.go create mode 100644 pkg/compact/dedup/replica_test.go diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index d799dc50b95..0da82422f73 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -17,6 +17,7 @@ import ( "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/compact" + "github.com/improbable-eng/thanos/pkg/compact/dedup" "github.com/improbable-eng/thanos/pkg/compact/downsample" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/client" @@ -110,6 +111,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups."). Default("1").Int() + enableDedup := cmd.Flag("enable-dedup", "Enable dedup function, but effect depends on 'dedup.replica-label' config").Default("false").Bool() + dedupReplicaLabel := cmd.Flag("dedup.replica-label", "Label to treat as a replica indicator along which data is deduplicated.").String() + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { return runCompact(g, logger, reg, *httpAddr, @@ -130,6 +134,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri *maxCompactionLevel, *blockSyncConcurrency, *compactionConcurrency, + *enableDedup, + *dedupReplicaLabel, ) } } @@ -152,6 +158,8 @@ func runCompact( maxCompactionLevel int, blockSyncConcurrency int, concurrency int, + enableDedup bool, + dedupReplicaLabel string, ) error { halted := prometheus.NewGauge(prometheus.GaugeOpts{ Name: "thanos_compactor_halted", @@ -210,6 +218,7 @@ func runCompact( } var ( + dedupDir = path.Join(dataDir, "dedup") compactDir = path.Join(dataDir, "compact") downsamplingDir = path.Join(dataDir, "downsample") indexCacheDir = path.Join(dataDir, "index_cache") @@ -234,7 +243,14 @@ func runCompact( level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h]) } + deduper := dedup.NewBucketDeduper(logger, reg, bkt, dedupDir, dedupReplicaLabel, consistencyDelay, blockSyncConcurrency) + f := func() error { + if isEnableDedup(enableDedup, dedupReplicaLabel) { + if err := deduper.Dedup(ctx); err != nil { + return errors.Wrap(err, "dedup failed") + } + } if err := compactor.Compact(ctx); err != nil { return errors.Wrap(err, "compaction failed") } @@ -443,3 +459,7 @@ func generateIndexCacheFile( } return nil } + +func isEnableDedup(enableDedup bool, dedupReplicaLabel string) bool { + return enableDedup && len(dedupReplicaLabel) > 0 +} diff --git a/cmd/thanos/dedup.go b/cmd/thanos/dedup.go new file mode 100644 index 00000000000..abf355c784c --- /dev/null +++ b/cmd/thanos/dedup.go @@ -0,0 +1,66 @@ +package main + +import ( + "context" + "path/filepath" + "time" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/compact/dedup" + "github.com/improbable-eng/thanos/pkg/objstore/client" + "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/oklog/run" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + "gopkg.in/alecthomas/kingpin.v2" +) + +func registerDedup(m map[string]setupFunc, app *kingpin.Application, name string) { + cmd := app.Command(name, "continuously dedup blocks in an object store bucket") + + dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process deduplication."). + Default("./data").String() + + replicaLabel := cmd.Flag("dedup.replica-label", "Label to treat as a replica indicator along which data is deduplicated.").Required(). + String() + + consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of fresh (non-dedup) blocks before they are being processed."). + Default("30m")) + + blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage."). + Default("20").Int() + + objStoreConfig := regCommonObjStoreFlags(cmd, "", true) + + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { + return runDedup(g, logger, reg, *dataDir, *replicaLabel, time.Duration(*consistencyDelay), *blockSyncConcurrency, objStoreConfig, name) + } +} + +func runDedup(g *run.Group, logger log.Logger, reg *prometheus.Registry, dataDir string, replicaLabel string, + consistencyDelay time.Duration, blockSyncConcurrency int, objStoreConfig *pathOrContent, component string) error { + confContentYaml, err := objStoreConfig.Content() + if err != nil { + return err + } + + bkt, err := client.NewBucket(logger, confContentYaml, reg, component) + if err != nil { + if bkt != nil { + runutil.CloseWithLogOnErr(logger, bkt, "bucket client") + } + return err + } + + ctx, cancel := context.WithCancel(context.Background()) + dedupDir := filepath.Join(dataDir, "dedup") + g.Add(func() error { + defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") + + deduper := dedup.NewBucketDeduper(logger, reg, bkt, dedupDir, replicaLabel, consistencyDelay, blockSyncConcurrency) + return deduper.Dedup(ctx) + }, func(error) { + cancel() + }) + return nil +} diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index 2a72a79d031..405258638f5 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -78,6 +78,7 @@ func main() { registerDownsample(cmds, app, "downsample") registerReceive(cmds, app, "receive") registerChecks(cmds, app, "check") + registerDedup(cmds, app, "dedup") cmd, err := app.Parse(os.Args[1:]) if err != nil { diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 61a9f0be1dc..2c571559abb 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -231,14 +231,14 @@ func (c *Syncer) syncMetas(ctx context.Context) error { }) close(metaIDsChan) if err != nil { - return retry(errors.Wrap(err, "retrieve bucket block metas")) + return Retry(errors.Wrap(err, "retrieve bucket block metas")) } wg.Wait() close(errChan) if err := <-errChan; err != nil { - return retry(err) + return Retry(err) } // Delete all local block dirs that no longer exist in the bucket. @@ -459,7 +459,7 @@ func (c *Syncer) garbageCollect(ctx context.Context, resolution int64) error { err := block.Delete(delCtx, c.bkt, id) cancel() if err != nil { - return retry(errors.Wrapf(err, "delete block %s from bucket", id)) + return Retry(errors.Wrapf(err, "delete block %s from bucket", id)) } // Immediately update our in-memory state so no further call to SyncMetas is needed @@ -633,7 +633,7 @@ type RetryError struct { err error } -func retry(err error) error { +func Retry(err error) error { if IsHaltError(err) { return err } @@ -716,7 +716,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir := filepath.Join(tmpdir, ie.id.String()) if err := block.Download(ctx, logger, bkt, ie.id, bdir); err != nil { - return retry(errors.Wrapf(err, "download block %s", ie.id)) + return Retry(errors.Wrapf(err, "download block %s", ie.id)) } meta, err := metadata.Read(bdir) @@ -736,7 +736,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, level.Info(logger).Log("msg", "uploading repaired block", "newID", resid) if err = block.Upload(ctx, logger, bkt, filepath.Join(tmpdir, resid.String())); err != nil { - return retry(errors.Wrapf(err, "upload of %s failed", resid)) + return Retry(errors.Wrapf(err, "upload of %s failed", resid)) } level.Info(logger).Log("msg", "deleting broken block", "id", ie.id) @@ -818,7 +818,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( } if err := block.Download(ctx, cg.logger, cg.bkt, id, pdir); err != nil { - return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", id)) + return false, ulid.ULID{}, Retry(errors.Wrapf(err, "download block %s", id)) } // Ensure all input blocks are valid. @@ -904,7 +904,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( begin = time.Now() if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil { - return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID)) + return false, ulid.ULID{}, Retry(errors.Wrapf(err, "upload of %s failed", compID)) } level.Debug(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin)) @@ -913,7 +913,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( // Eventually the block we just uploaded should get synced into the group again (including sync-delay). for _, b := range plan { if err := cg.deleteBlock(b); err != nil { - return false, ulid.ULID{}, retry(errors.Wrapf(err, "delete old block from bucket")) + return false, ulid.ULID{}, Retry(errors.Wrapf(err, "delete old block from bucket")) } cg.groupGarbageCollectedBlocks.Inc() } diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index 4639fff67af..c9786c32e57 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -57,16 +57,16 @@ func TestRetryError(t *testing.T) { err := errors.New("test") testutil.Assert(t, !IsRetryError(err), "retry error") - err = retry(errors.New("test")) + err = Retry(errors.New("test")) testutil.Assert(t, IsRetryError(err), "not a retry error") - err = errors.Wrap(retry(errors.New("test")), "something") + err = errors.Wrap(Retry(errors.New("test")), "something") testutil.Assert(t, IsRetryError(err), "not a retry error") - err = errors.Wrap(errors.Wrap(retry(errors.New("test")), "something"), "something2") + err = errors.Wrap(errors.Wrap(Retry(errors.New("test")), "something"), "something2") testutil.Assert(t, IsRetryError(err), "not a retry error") - err = errors.Wrap(retry(errors.Wrap(halt(errors.New("test")), "something")), "something2") + err = errors.Wrap(Retry(errors.Wrap(halt(errors.New("test")), "something")), "something2") testutil.Assert(t, IsHaltError(err), "not a halt error. Retry should not hide halt error") } diff --git a/pkg/compact/dedup/dedup.go b/pkg/compact/dedup/dedup.go new file mode 100644 index 00000000000..f6198593945 --- /dev/null +++ b/pkg/compact/dedup/dedup.go @@ -0,0 +1,73 @@ +package dedup + +import ( + "context" + "os" + "time" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/improbable-eng/thanos/pkg/objstore" + "github.com/pkg/errors" +) + +type BucketDeduper struct { + logger log.Logger + dedupDir string + replicaLabelName string + bkt objstore.Bucket + + metrics *DedupMetrics + + syncer *ReplicaSyncer + merger *ReplicaMerger +} + +func NewBucketDeduper(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, dedupDir, replicaLabelName string, + consistencyDelay time.Duration, blockSyncConcurrency int) *BucketDeduper { + metrics := NewDedupMetrics(reg) + return &BucketDeduper{ + logger: logger, + dedupDir: dedupDir, + replicaLabelName: replicaLabelName, + bkt: bkt, + metrics: metrics, + syncer: NewReplicaSyncer(logger, metrics, bkt, replicaLabelName, consistencyDelay, blockSyncConcurrency), + merger: NewReplicaMerger(logger, metrics, bkt, dedupDir, replicaLabelName), + } +} + +func (d *BucketDeduper) Dedup(ctx context.Context) error { + level.Info(d.logger).Log("msg", "start of deduplication") + start := time.Now() + if err := os.RemoveAll(d.dedupDir); err != nil { + return errors.Wrap(err, "clean up the dedup temporary directory") + } + if err := os.MkdirAll(d.dedupDir, 0777); err != nil { + return errors.Wrap(err, "create the dedup temporary directory") + } + + replicas, err := d.syncer.Sync(ctx) + if err != nil { + return errors.Wrap(err, "sync replica metas") + } + + groups := make(map[string]Replicas) + for _, r := range replicas { + group := r.Group() + groups[group] = append(groups[group], r) + } + for k, v := range groups { + level.Info(d.logger).Log("msg", "starting to dedup replicas", "group", k) + d.metrics.deduplication.WithLabelValues(d.bkt.Name()).Inc() + if err := d.merger.Merge(ctx, v); err != nil { + d.metrics.deduplicationFailures.WithLabelValues(d.bkt.Name(), k).Inc() + return errors.Wrapf(err, "merge replicas: %s", k) + } + level.Info(d.logger).Log("msg", "completed to dedup replicas", "group", k) + } + level.Info(d.logger).Log("msg", "deduplication process done", "duration", time.Since(start)) + return nil +} diff --git a/pkg/compact/dedup/dedup_test.go b/pkg/compact/dedup/dedup_test.go new file mode 100644 index 00000000000..ea6ae560694 --- /dev/null +++ b/pkg/compact/dedup/dedup_test.go @@ -0,0 +1,153 @@ +package dedup + +import ( + "context" + "fmt" + "io/ioutil" + "math/rand" + "os" + "path/filepath" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/objstore" + "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/chunkenc" + "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/index" + "github.com/prometheus/tsdb/labels" +) + +func TestBucketDeduper_Dedup(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + logger := log.NewNopLogger() + dataDir, err := ioutil.TempDir("", "thanos-dedup") + testutil.Ok(t, err) + reg := prometheus.NewRegistry() + bkt := mockObjectStoreBucket(t, ctx, logger) + + rawLabelSeries := getBucketTimeSeries(t, ctx, logger, bkt) + + deduper := NewBucketDeduper(logger, reg, bkt, dataDir, "replica", 0, 1) + + err = deduper.Dedup(ctx) + testutil.Ok(t, err) + + newLabelSeries := getBucketTimeSeries(t, ctx, logger, bkt) + + testutil.Assert(t, len(newLabelSeries) == len(rawLabelSeries), "dedup failed") + for k := range newLabelSeries { + rawDataPoints := rawLabelSeries[k] + newDataPoints := newLabelSeries[k] + testutil.Assert(t, len(rawDataPoints) == len(newDataPoints), "dedup failed") + for timestamp, values := range newDataPoints { + testutil.Assert(t, len(values) == 1, "dedup failed") + rawValues := rawDataPoints[timestamp] + found := false + for _, rv := range rawValues { + if rv == values[0] { + found = true + break + } + } + testutil.Assert(t, found, "dedup failed") + } + } + + replicas := getBucketReplicas(t, ctx, logger, bkt) + testutil.Assert(t, len(replicas) == 1, "dedup failed") + for _, replica := range replicas { + testutil.Assert(t, replica.isAggReplica(), "dedup failed") + } +} + +func getBucketReplicas(t *testing.T, ctx context.Context, logger log.Logger, bkt objstore.Bucket) Replicas { + syncer := NewReplicaSyncer(logger, NewDedupMetrics(prometheus.NewRegistry()), bkt, "replica", 0, 1) + replicas, err := syncer.Sync(ctx) + testutil.Ok(t, err) + return replicas +} + +func getBucketTimeSeries(t *testing.T, ctx context.Context, logger log.Logger, bkt objstore.Bucket) map[string]map[int64][]float64 { + dataDir, err := ioutil.TempDir("", "thanos-dedup-"+string(rand.Int())) + testutil.Ok(t, err) + + replicas := getBucketReplicas(t, ctx, logger, bkt) + result := make(map[string]map[int64][]float64) + for _, replica := range replicas { + for _, b := range replica.Blocks { + blockDir := filepath.Join(dataDir, b.ULID.String()) + err = os.RemoveAll(blockDir) + testutil.Ok(t, err) + err = block.Download(ctx, logger, bkt, b.ULID, blockDir) + testutil.Ok(t, err) + labelSamples := getBlockSampleSeries(t, logger, blockDir) + for k, samples := range labelSamples { + if _, ok := result[k]; !ok { + result[k] = make(map[int64][]float64) + } + for _, v := range samples { + if _, ok := result[k][v.timestamp]; !ok { + result[k][v.timestamp] = make([]float64, 0) + } + result[k][v.timestamp] = append(result[k][v.timestamp], v.value) + } + } + } + } + return result +} + +func getBlockSampleSeries(t *testing.T, logger log.Logger, blockDir string) map[string][]*Sample { + b, err := tsdb.OpenBlock(logger, blockDir, chunkenc.NewPool()) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, b.Close()) + }() + ir, err := b.Index() + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, ir.Close()) + }() + cr, err := b.Chunks() + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, cr.Close()) + }() + postings, err := ir.Postings(index.AllPostingsKey()) + testutil.Ok(t, err) + postings = ir.SortedPostings(postings) + + result := make(map[string][]*Sample) + var lset labels.Labels + var chks []chunks.Meta + for postings.Next() { + lset = lset[:0] + chks = chks[:0] + err := ir.Series(postings.At(), &lset, &chks) + testutil.Ok(t, err) + key := fmt.Sprint(lset) + var samples []*Sample + for _, c := range chks { + chk, _ := cr.Chunk(c.Ref) + iterator := chk.Iterator() + for iterator.Next() { + timestamp, value := iterator.At() + samples = append(samples, &Sample{ + timestamp: timestamp, + value: value, + }) + } + } + if _, ok := result[key]; !ok { + result[key] = make([]*Sample, 0) + } + result[key] = append(result[key], samples...) + } + return result +} diff --git a/pkg/compact/dedup/merger.go b/pkg/compact/dedup/merger.go new file mode 100644 index 00000000000..b1cd9e75c9c --- /dev/null +++ b/pkg/compact/dedup/merger.go @@ -0,0 +1,486 @@ +package dedup + +import ( + "context" + "fmt" + "math/rand" + "os" + "path/filepath" + "sort" + "strings" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/improbable-eng/thanos/pkg/compact" + "github.com/improbable-eng/thanos/pkg/compact/downsample" + "github.com/improbable-eng/thanos/pkg/objstore" + "github.com/improbable-eng/thanos/pkg/query" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/labels" +) + +type TimeWindow struct { + MinTime int64 + MaxTime int64 +} + +func (tw *TimeWindow) String() string { + return fmt.Sprintf("[%d, %d]", tw.MinTime, tw.MaxTime) +} + +func NewTimeWindow(minTime, maxTime int64) *TimeWindow { + return &TimeWindow{MinTime: minTime, MaxTime: maxTime} +} + +// Group blocks under the same time window from different replicas +type BlockGroup struct { + window *TimeWindow + blocks []*metadata.Meta +} + +func (g *BlockGroup) String() string { + builder := strings.Builder{} + builder.WriteString("[") + for i, b := range g.blocks { + if i != 0 { + builder.WriteString(",") + } + builder.WriteString(b.ULID.String()) + } + builder.WriteString("]") + return fmt.Sprintf("BlockGroup{window: %s, blocks: %s}", g.window, builder.String()) +} + +func NewBlockGroup(window *TimeWindow, blocks []*metadata.Meta) *BlockGroup { + return &BlockGroup{window: window, blocks: blocks} +} + +type BlockGroups []*BlockGroup + +func NewBlockGroups(replicas Replicas) BlockGroups { + if len(replicas) == 0 { + return nil + } + blocks := make([]*metadata.Meta, 0) + for _, v := range replicas { + blocks = append(blocks, v.Blocks...) + } + // Prefer to use larger time window to group blocks, best effort to not break the compacted blocks + // If two blocks with same duration, prefer to handle the one with smaller minTime firstly + sort.Slice(blocks, func(i, j int) bool { + d1 := blocks[i].MaxTime - blocks[i].MinTime + d2 := blocks[j].MaxTime - blocks[j].MinTime + if d1 == d2 { + return blocks[i].MinTime < blocks[j].MinTime + } + return d1 > d2 + }) + groups := make(BlockGroups, 0) + covered := make([]*TimeWindow, 0) + for _, b := range blocks { + tw := getUncoveredTimeWindow(covered, b) + if tw == nil { + continue + } + groups = append(groups, getBlockGroup(blocks, tw)) + covered = append(covered, tw) + } + sort.Slice(groups, func(i, j int) bool { + return groups[i].window.MinTime < groups[j].window.MinTime + }) + return groups +} + +func getUncoveredTimeWindow(covered []*TimeWindow, b *metadata.Meta) *TimeWindow { + minTime := b.MinTime + maxTime := b.MaxTime + for _, v := range covered { + if minTime >= v.MinTime && minTime < v.MaxTime { + minTime = v.MaxTime + } + if maxTime > v.MinTime && maxTime <= v.MaxTime { + maxTime = v.MinTime + } + if minTime >= maxTime { + return nil + } + } + return NewTimeWindow(minTime, maxTime) +} + +func getBlockGroup(blocks []*metadata.Meta, tw *TimeWindow) *BlockGroup { + target := make([]*metadata.Meta, 0) + for _, b := range blocks { + if b.MaxTime <= tw.MinTime || b.MinTime >= tw.MaxTime { + continue + } + target = append(target, b) + } + return NewBlockGroup(tw, target) +} + +type ReplicaMerger struct { + logger log.Logger + metrics *DedupMetrics + bkt objstore.Bucket + dir string + replicaLabel string +} + +func NewReplicaMerger(logger log.Logger, metrics *DedupMetrics, bkt objstore.Bucket, dir string, replicaLabel string) *ReplicaMerger { + return &ReplicaMerger{ + logger: logger, + metrics: metrics, + bkt: bkt, + dir: dir, + replicaLabel: replicaLabel, + } +} + +func (rm *ReplicaMerger) Merge(ctx context.Context, replicas Replicas) error { + groups := rm.plan(ctx, replicas) + + for _, group := range groups { + if err := rm.prepare(ctx, group); err != nil { + return errors.Wrapf(err, "prepare phase of group: %s", group) + } + id, err := rm.merge(ctx, group) + if err != nil { + return errors.Wrapf(err, "merge phase of group: %s", group) + } + if err := rm.upload(ctx, group, id); err != nil { + return errors.Wrapf(err, "upload phase of group: %s", group) + } + if err := rm.clean(ctx, group, id); err != nil { + return errors.Wrapf(err, "clean phase of group: %s", group) + } + } + return nil +} + +func (rm *ReplicaMerger) plan(ctx context.Context, replicas Replicas) BlockGroups { + if len(replicas) < 2 { + return nil + } + groups := NewBlockGroups(replicas) + target := make(BlockGroups, 0, len(groups)) + for _, group := range groups { + // if the group only includes less than 2 blocks, then skip it + if len(group.blocks) < 2 { + continue + } + target = append(target, group) + } + return target +} + +func (rm *ReplicaMerger) prepare(ctx context.Context, group *BlockGroup) error { + var wg sync.WaitGroup + defer wg.Wait() + + mCtx, cancel := context.WithCancel(ctx) + defer cancel() + + errChan := make(chan error, len(group.blocks)) + + for _, b := range group.blocks { + wg.Add(1) + go func(b *metadata.Meta) { + defer wg.Done() + rm.metrics.syncBlocks.WithLabelValues(rm.bkt.Name()).Inc() + begin := time.Now() + err := rm.download(mCtx, b) + rm.metrics.syncBlockDuration.WithLabelValues(rm.bkt.Name()).Observe(time.Since(begin).Seconds()) + if err != nil { + rm.metrics.syncMetaFailures.WithLabelValues(rm.bkt.Name(), b.ULID.String()).Inc() + errChan <- err + } + }(b) + } + + wg.Wait() + close(errChan) + + if err := <-errChan; err != nil { + return err + } + return nil +} + +func (rm *ReplicaMerger) download(ctx context.Context, b *metadata.Meta) error { + blockDir := filepath.Join(rm.dir, b.ULID.String()) + if err := rm.deleteLocalBlock(&b.ULID); err != nil { + return compact.Retry(errors.Wrapf(err, "clean up block dir: %s", blockDir)) + } + err := block.Download(ctx, rm.logger, rm.bkt, b.ULID, blockDir) + if err != nil { + rm.metrics.operateRemoteStorageFailures.WithLabelValues("get", rm.bkt.Name(), b.ULID.String()).Inc() + return compact.Retry(errors.Wrapf(err, "download block %s", b.ULID)) + } + level.Debug(rm.logger).Log("msg", "downloaded block from remote bucket", "block", b.ULID) + return nil +} + +func (rm *ReplicaMerger) merge(ctx context.Context, group *BlockGroup) (*ulid.ULID, error) { + if len(group.blocks) == 0 { + return nil, nil + } + baseBlock := group.blocks[0] + readers := make([]*BlockReader, 0, len(group.blocks)) + + defer func() { + for _, reader := range readers { + if err := reader.Close(); err != nil { + level.Warn(rm.logger).Log("msg", "failed to close block reader", "err", err) + } + } + }() + + for _, b := range group.blocks { + blockDir := filepath.Join(rm.dir, b.ULID.String()) + reader, err := NewBlockReader(rm.logger, blockDir) + if err != nil { + if err := reader.Close(); err != nil { + level.Warn(rm.logger).Log("msg", "failed to close block reader", "err", err) + } + return nil, err + } + readers = append(readers, reader) + } + + newId := ulid.MustNew(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano()))) + newMeta := rm.newMeta(baseBlock, newId, group.window) + blockDir := filepath.Join(rm.dir, newMeta.ULID.String()) + + if err := rm.write(readers, blockDir, newMeta, group.window); err != nil { + return nil, err + } + return &newId, nil +} + +func (rm *ReplicaMerger) newMeta(baseMeta *metadata.Meta, newId ulid.ULID, tw *TimeWindow) *metadata.Meta { + newMeta := *baseMeta + newMeta.ULID = newId + newMeta.MinTime = tw.MinTime + newMeta.MaxTime = tw.MaxTime + newSources := make([]ulid.ULID, 0, len(newMeta.Compaction.Sources)) + var hasOldId bool + for _, source := range newMeta.Compaction.Sources { + if source == baseMeta.ULID { + hasOldId = true + continue + } + newSources = append(newSources, source) + } + if hasOldId { + newSources = append(newSources, newId) + } + newMeta.Compaction.Sources = newSources + newMeta.Thanos.Labels[rm.replicaLabel] = AggReplicaLabel + return &newMeta +} + +func (rm *ReplicaMerger) write(readers []*BlockReader, blockDir string, meta *metadata.Meta, tw *TimeWindow) error { + symbols, err := rm.getMergedSymbols(readers) + if err != nil { + return err + } + writer, err := downsample.NewStreamedBlockWriter(blockDir, symbols, rm.logger, *meta) + if err != nil { + return err + } + + buf := make([]*SampleReader, len(readers), len(readers)) + + running := true + for running { + running = false + + for i, reader := range readers { + if buf[i] != nil { + running = true + continue + } + hasNext := reader.postings.Next() + if !hasNext { + continue + } + var lset labels.Labels + var chks []chunks.Meta + if err := reader.ir.Series(reader.postings.At(), &lset, &chks); err != nil { + return err + } + buf[i] = NewSampleReader(reader.cr, lset, chks) + running = true + } + + cs, err := rm.getMergedChunkSeries(buf, tw) + if err != nil { + return err + } + + if cs == nil { + continue + } + + if err := writer.WriteSeries(cs.lset, cs.chks); err != nil { + return err + } + + for i, v := range buf { + if v == nil { + continue + } + if labels.Compare(v.lset, cs.lset) == 0 { + buf[i] = nil + } + } + } + + if err := writer.Close(); err != nil { + return err + } + return nil +} + +func (rm *ReplicaMerger) getMergedSymbols(readers []*BlockReader) (map[string]struct{}, error) { + result := make(map[string]struct{}) + for _, reader := range readers { + symbols, err := reader.Symbols() + if err != nil { + return nil, err + } + for k := range symbols { + if _, ok := result[k]; !ok { + result[k] = struct{}{} + } + } + } + return result, nil +} + +func (rm *ReplicaMerger) getMergedChunkSeries(readers []*SampleReader, tw *TimeWindow) (*ChunkSeries, error) { + buf := make([]*SampleReader, len(readers)) + copy(buf, readers) + + sort.Slice(buf, func(i, j int) bool { + if buf[i] == nil { + return false + } + if buf[j] == nil { + return true + } + return labels.Compare(buf[i].lset, buf[j].lset) < 0 + }) + + if buf[0] == nil { + return nil, nil + } + + lset := buf[0].lset + samples, err := buf[0].Read(tw) + if err != nil { + return nil, err + } + it := query.NewDedupSeriesIterator(NewSampleIterator(nil), NewSampleIterator(samples)) + for i := 1; i < len(buf); i++ { + if buf[i] == nil { + break + } + if labels.Compare(buf[i].lset, lset) != 0 { + break + } + ss, err := buf[i].Read(tw) + if err != nil { + return nil, err + } + if len(ss) == 0 { + continue + } + it = query.NewDedupSeriesIterator(it, NewSampleIterator(ss)) + } + + return NewSampleSeries(lset, rm.getMergedSamples(it)).ToChunkSeries() +} + +func (rm *ReplicaMerger) getMergedSamples(it storage.SeriesIterator) []*Sample { + samples := make([]*Sample, 0) + for it.Next() { + t, v := it.At() + samples = append(samples, NewSample(t, v)) + } + return samples +} + +func (rm *ReplicaMerger) upload(ctx context.Context, group *BlockGroup, newId *ulid.ULID) error { + blockDir := filepath.Join(rm.dir, newId.String()) + if err := block.VerifyIndex(rm.logger, filepath.Join(blockDir, block.IndexFilename), group.window.MinTime, group.window.MaxTime); err != nil { + return errors.Wrapf(err, "agg block index not valid: %s", newId) + } + level.Debug(rm.logger).Log("msg", "verified agg block index", "block", newId, "dir", blockDir) + if err := block.Upload(ctx, rm.logger, rm.bkt, blockDir); err != nil { + rm.metrics.operateRemoteStorageFailures.WithLabelValues("upload", rm.bkt.Name(), newId.String()).Inc() + return compact.Retry(errors.Wrapf(err, "upload of %s failed", newId)) + } + level.Debug(rm.logger).Log("msg", "uploaded agg block to remote bucket", "block", newId, "dir", blockDir) + return nil +} + +func (rm *ReplicaMerger) clean(ctx context.Context, group *BlockGroup, newId *ulid.ULID) error { + // delete blocks in remote storage + for _, b := range group.blocks { + if b.MaxTime > group.window.MaxTime { + continue + } + if err := rm.deleteRemoteBlock(&b.ULID); err != nil { + return compact.Retry(errors.Wrapf(err, "delete block %s from bucket", b.ULID.String())) + } + } + + // delete blocks in local storage + if err := rm.deleteLocalBlock(newId); err != nil { + return compact.Retry(errors.Wrapf(err, "delete agg block: %s", newId.String())) + } + + for _, b := range group.blocks { + if err := rm.deleteLocalBlock(&b.ULID); err != nil { + return compact.Retry(errors.Wrapf(err, "delete merged block: %s", newId.String())) + } + } + + return nil +} + +func (rm *ReplicaMerger) deleteRemoteBlock(id *ulid.ULID) error { + if id == nil { + return nil + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + if err := block.Delete(ctx, rm.bkt, *id); err != nil { + rm.metrics.operateRemoteStorageFailures.WithLabelValues("delete", rm.bkt.Name(), id.String()).Inc() + return err + } + level.Debug(rm.logger).Log("msg", "deleted remote block", "block", id.String()) + return nil +} + +func (rm *ReplicaMerger) deleteLocalBlock(id *ulid.ULID) error { + if id == nil { + return nil + } + blockDir := filepath.Join(rm.dir, id.String()) + if err := os.RemoveAll(blockDir); err != nil { + rm.metrics.operateLocalStorageFailures.WithLabelValues("delete", id.String()).Inc() + return err + } + level.Debug(rm.logger).Log("msg", "deleted local block", "block", blockDir) + return nil +} \ No newline at end of file diff --git a/pkg/compact/dedup/merger_test.go b/pkg/compact/dedup/merger_test.go new file mode 100644 index 00000000000..7e22c014063 --- /dev/null +++ b/pkg/compact/dedup/merger_test.go @@ -0,0 +1,119 @@ +package dedup + +import ( + "context" + "io/ioutil" + "reflect" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/prometheus/client_golang/prometheus" +) + +func TestNewBlockGroups(t *testing.T) { + input := []struct { + blocks []*metadata.Meta + }{ + { + blocks: []*metadata.Meta{ + mockMeta(t, "s0", "r0", 0, 100, 200), + mockMeta(t, "s0", "r1", 0, 100, 200), + }, + }, + { + blocks: []*metadata.Meta{ + mockMeta(t, "s0", "r0", 0, 200, 300), + mockMeta(t, "s0", "r1", 0, 100, 200), + mockMeta(t, "s0", "r1", 0, 200, 300), + }, + }, + { + blocks: []*metadata.Meta{ + mockMeta(t, "s0", "r0", 0, 100, 200), + mockMeta(t, "s0", "r0", 0, 200, 300), + mockMeta(t, "s0", "r0", 0, 300, 400), + mockMeta(t, "s0", "r1", 0, 200, 400), + }, + }, + { + blocks: []*metadata.Meta{ + mockMeta(t, "s0", "r0", 0, 100, 300), + mockMeta(t, "s0", "r0", 0, 300, 400), + mockMeta(t, "s0", "r1", 0, 200, 400), + }, + }, + } + + expected := []struct { + length int + windows []*TimeWindow + blockNums []int + }{ + { + length: 1, + windows: []*TimeWindow{ + NewTimeWindow(100, 200), + }, + blockNums: []int{2}, + }, + { + length: 2, + windows: []*TimeWindow{ + NewTimeWindow(100, 200), + NewTimeWindow(200, 300), + }, + blockNums: []int{1, 2}, + }, + { + length: 2, + windows: []*TimeWindow{ + NewTimeWindow(100, 200), + NewTimeWindow(200, 400), + }, + blockNums: []int{1, 3}, + }, + { + length: 2, + windows: []*TimeWindow{ + NewTimeWindow(100, 300), + NewTimeWindow(300, 400), + }, + blockNums: []int{2, 2}, + }, + } + + for i, v := range input { + replicas, err := NewReplicas("replica", v.blocks) + testutil.Ok(t, err) + groups := NewBlockGroups(replicas) + testutil.Assert(t, len(groups) == expected[i].length, "new block groups failed") + for j, g := range groups { + testutil.Assert(t, reflect.DeepEqual(g.window, expected[i].windows[j]), "new block groups failed") + testutil.Assert(t, len(g.blocks) == expected[i].blockNums[j], "new block groups failed") + + } + } +} + +func TestReplicaMerger_Merge(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + logger := log.NewNopLogger() + dataDir, err := ioutil.TempDir("", "thanos-dedup-merger") + testutil.Ok(t, err) + metrics := NewDedupMetrics(prometheus.NewRegistry()) + bkt := mockObjectStoreBucket(t, ctx, logger) + + replicas := getBucketReplicas(t, ctx, logger, bkt) + testutil.Assert(t, len(replicas) == 2, "merge failed") + testutil.Assert(t, len(replicas[0].Blocks) > 0, "merge failed") + testutil.Assert(t, len(replicas[1].Blocks) > 0, "merge failed") + + merger := NewReplicaMerger(logger, metrics, bkt, dataDir, "replica") + + err = merger.Merge(ctx, replicas) + testutil.Ok(t, err) +} diff --git a/pkg/compact/dedup/metrics.go b/pkg/compact/dedup/metrics.go new file mode 100644 index 00000000000..8c9bb5f0dd6 --- /dev/null +++ b/pkg/compact/dedup/metrics.go @@ -0,0 +1,85 @@ +package dedup + +import "github.com/prometheus/client_golang/prometheus" + +type DedupMetrics struct { + deduplication *prometheus.CounterVec + deduplicationFailures *prometheus.CounterVec + + syncMetas *prometheus.CounterVec + syncMetaFailures *prometheus.CounterVec + syncMetaDuration *prometheus.HistogramVec + + syncBlocks *prometheus.CounterVec + syncBlockFailures *prometheus.CounterVec + syncBlockDuration *prometheus.HistogramVec + + operateLocalStorageFailures *prometheus.CounterVec + operateRemoteStorageFailures *prometheus.CounterVec +} + +func NewDedupMetrics(reg prometheus.Registerer) *DedupMetrics { + metrics := &DedupMetrics{ + deduplication: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_dedup_bucket_deduplication_total", + Help: "Total number of bucket deduplication attempts.", + }, []string{"bucket"}), + deduplicationFailures: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_dedup_bucket_deduplication_failures", + Help: "Total number of failed bucket deduplication.", + }, []string{"bucket", "group"}), + + syncMetas: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_dedup_sync_meta_total", + Help: "Total number of sync meta operations.", + }, []string{"bucket"}), + syncMetaFailures: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_dedup_sync_meta_failures", + Help: "Total number of failed sync meta operations.", + }, []string{"bucket", "block"}), + syncMetaDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "thanos_dedup_sync_meta_duration_seconds", + Help: "Time it took to sync meta files.", + Buckets: []float64{ + 0.25, 0.6, 1, 2, 3.5, 5, 7.5, 10, 15, 30, 60, 100, 200, 500, + }, + }, []string{"bucket"}), + + syncBlocks: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_dedup_sync_block_total", + Help: "Total number of sync block operations.", + }, []string{"bucket"}), + syncBlockFailures: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_dedup_sync_block_failures", + Help: "Total number of failed sync block operations", + }, []string{"bucket", "block"}), + syncBlockDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "thanos_dedup_sync_block_duration_seconds", + Help: "Time it took to sync block files.", + Buckets: []float64{ + 0.25, 0.6, 1, 2, 3.5, 5, 7.5, 10, 15, 30, 60, 100, 200, 500, + }, + }, []string{"bucket"}), + + operateLocalStorageFailures: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_dedup_operate_local_storage_failures", + Help: "Total number of failed operate local storage operations", + }, []string{"operation", "block"}), + operateRemoteStorageFailures: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_dedup_operate_remote_storage_failures", + Help: "Total number of failed operate remote storage operations", + }, []string{"operation", "bucket", "block"}), + } + reg.MustRegister( + metrics.deduplication, + metrics.deduplicationFailures, + metrics.syncMetas, + metrics.syncMetaFailures, + metrics.syncMetaDuration, + metrics.syncBlocks, + metrics.syncBlockFailures, + metrics.syncBlockDuration, + metrics.operateLocalStorageFailures, + metrics.operateRemoteStorageFailures) + return metrics +} diff --git a/pkg/compact/dedup/reader.go b/pkg/compact/dedup/reader.go new file mode 100644 index 00000000000..f79bb0ccc42 --- /dev/null +++ b/pkg/compact/dedup/reader.go @@ -0,0 +1,212 @@ +package dedup + +import ( + "io" + "math" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/chunkenc" + "github.com/prometheus/tsdb/chunks" + tsdberrors "github.com/prometheus/tsdb/errors" + "github.com/prometheus/tsdb/index" + "github.com/prometheus/tsdb/labels" +) + +type ChunkSeries struct { + lset labels.Labels + chks []chunks.Meta +} + +type Sample struct { + timestamp int64 + value float64 +} + +func NewSample(timestamp int64, value float64) *Sample { + return &Sample{timestamp: timestamp, value: value} +} + +type SampleIterator struct { + samples []*Sample + i int +} + +func NewSampleIterator(samples []*Sample) *SampleIterator { + return &SampleIterator{samples: samples} +} + +func (s *SampleIterator) Err() error { + return nil +} + +func (s *SampleIterator) At() (int64, float64) { + return s.samples[s.i].timestamp, s.samples[s.i].value +} + +func (s *SampleIterator) Next() bool { + if s.i >= len(s.samples) { + return false + } + s.i++ + return true +} + +func (s *SampleIterator) Seek(t int64) bool { + if s.i < 0 { + s.i = 0 + } + for { + if s.i >= len(s.samples) { + return false + } + if s.samples[s.i].timestamp >= t { + return true + } + s.i++ + } +} + +type SampleSeries struct { + lset labels.Labels + samples []*Sample +} + +func NewSampleSeries(lset labels.Labels, samples []*Sample) *SampleSeries { + return &SampleSeries{ + lset: lset, + samples: samples, + } +} + +func (ss *SampleSeries) ToChunkSeries() (*ChunkSeries, error) { + if len(ss.samples) == 0 { + return nil, nil + } + + chunk := chunkenc.NewXORChunk() + appender, err := chunk.Appender() + if err != nil { + return nil, err + } + minTime := int64(math.MaxInt64) + maxTime := int64(math.MinInt64) + for _, v := range ss.samples { + if minTime > v.timestamp { + minTime = v.timestamp + } + if maxTime < v.timestamp { + maxTime = v.timestamp + } + appender.Append(v.timestamp, v.value) + } + return &ChunkSeries{ + lset: ss.lset, + chks: []chunks.Meta{ + { + Chunk: chunk, + MinTime: minTime, + MaxTime: maxTime, + }, + }, + }, nil +} + +type SampleReader struct { + cr tsdb.ChunkReader + lset labels.Labels + chks []chunks.Meta +} + +func NewSampleReader(cr tsdb.ChunkReader, lset labels.Labels, chks []chunks.Meta) *SampleReader { + return &SampleReader{ + cr: cr, + lset: lset, + chks: chks, + } +} + +func (r *SampleReader) Read(tw *TimeWindow) ([]*Sample, error) { + samples := make([]*Sample, 0) + for _, c := range r.chks { + chk, err := r.cr.Chunk(c.Ref) + if err != nil { + return nil, errors.Wrapf(err, "get chunk %d", c.Ref) + } + iterator := chk.Iterator() + for iterator.Next() { + timestamp, value := iterator.At() + if timestamp < tw.MinTime { + continue + } + // Ignore the data point which timestamp is same with MaxTime. + // Make sure the block use scope [MinTime, MaxTime) instead of [MinTime, MaxTime] + if timestamp >= tw.MaxTime { + break + } + samples = append(samples, &Sample{ + timestamp: timestamp, + value: value, + }) + } + } + return samples, nil +} + +type BlockReader struct { + logger log.Logger + closers []io.Closer + + ir tsdb.IndexReader + cr tsdb.ChunkReader + + postings index.Postings +} + +func NewBlockReader(logger log.Logger, blockDir string) (*BlockReader, error) { + reader := &BlockReader{ + logger: logger, + closers: make([]io.Closer, 0, 3), + } + + b, err := tsdb.OpenBlock(logger, blockDir, chunkenc.NewPool()) + if err != nil { + return reader, errors.Wrapf(err, "open block under dir %s", blockDir) + } + reader.closers = append(reader.closers, b) + + ir, err := b.Index() + if err != nil { + return reader, errors.Wrap(err, "open index") + } + reader.ir = ir + reader.closers = append(reader.closers, ir) + + cr, err := b.Chunks() + if err != nil { + return reader, errors.Wrap(err, "open chunks") + } + reader.cr = cr + reader.closers = append(reader.closers, cr) + + postings, err := ir.Postings(index.AllPostingsKey()) + if err != nil { + return reader, errors.Wrap(err, "read index postings") + } + reader.postings = ir.SortedPostings(postings) + + return reader, nil +} + +func (r *BlockReader) Symbols() (map[string]struct{}, error) { + return r.ir.Symbols() +} + +func (r *BlockReader) Close() error { + var merr tsdberrors.MultiError + for i := len(r.closers) - 1; i >= 0; i-- { + merr.Add(r.closers[i].Close()) + } + return errors.Wrap(merr.Err(), "close closers") +} diff --git a/pkg/compact/dedup/reader_test.go b/pkg/compact/dedup/reader_test.go new file mode 100644 index 00000000000..5a5413743d3 --- /dev/null +++ b/pkg/compact/dedup/reader_test.go @@ -0,0 +1,71 @@ +package dedup + +import ( + "context" + "io/ioutil" + "math/rand" + "path/filepath" + "testing" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/prometheus/tsdb/labels" +) + +func TestSampleSeries_ToChunkSeries(t *testing.T) { + series := &SampleSeries{ + lset: labels.Labels{ + {Name: "b", Value: "1"}, + {Name: "a", Value: "1"}, + }, + samples: []*Sample{ + {timestamp: 7, value: rand.Float64()}, + {timestamp: 5, value: rand.Float64()}, + {timestamp: 9, value: rand.Float64()}, + {timestamp: 1, value: rand.Float64()}, + {timestamp: 2, value: rand.Float64()}, + {timestamp: 6, value: rand.Float64()}, + }, + } + + chunkSeries, err := series.ToChunkSeries() + testutil.Ok(t, err) + testutil.Assert(t, len(chunkSeries.chks) == 1, "chunk series conversion failed") + testutil.Assert(t, chunkSeries.chks[0].MinTime == 1, "chunk series conversion failed") + testutil.Assert(t, chunkSeries.chks[0].MaxTime == 9, "chunk series conversion failed") +} + +func TestNewBlockReader(t *testing.T) { + reader := createBlockReader(t) + testutil.Assert(t, reader != nil, "new block reader failed") + testutil.Assert(t, reader.ir != nil, "new block reader failed") + testutil.Assert(t, reader.cr != nil, "new block reader failed") + testutil.Assert(t, reader.postings != nil, "new block reader failed") + testutil.Assert(t, len(reader.closers) == 3, "new block reader failed") +} + +func TestBlockReader_Symbols(t *testing.T) { + reader := createBlockReader(t) + symbols, err := reader.Symbols() + testutil.Ok(t, err) + testutil.Assert(t, len(symbols) > 0, "new block reader failed") +} + +func TestBlockReader_Close(t *testing.T) { + reader := createBlockReader(t) + err := reader.Close() + testutil.Ok(t, err) +} + +func createBlockReader(t *testing.T) *BlockReader { + dataDir, err := ioutil.TempDir("", "thanos-dedup-streamed-block-reader") + testutil.Ok(t, err) + id := createBlock(t, context.Background(), dataDir, "r0") + + blockDir := filepath.Join(dataDir, id.String()) + + reader, err := NewBlockReader(log.NewNopLogger(), blockDir) + testutil.Ok(t, err) + + return reader +} diff --git a/pkg/compact/dedup/replica.go b/pkg/compact/dedup/replica.go new file mode 100644 index 00000000000..32a64c3110e --- /dev/null +++ b/pkg/compact/dedup/replica.go @@ -0,0 +1,246 @@ +package dedup + +import ( + "context" + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/improbable-eng/thanos/pkg/compact" + "github.com/improbable-eng/thanos/pkg/objstore" + "github.com/oklog/ulid" + "github.com/pkg/errors" +) + +const ( + AggReplicaLabel = "_agg_replica_" + ResolutionLabel = "resolution" +) + +// Replica defines a group of blocks from same global configs and resolution. +// Ex, replica{name=r0,labels=[cluster=c0, shard=s0,resolution=0],blocks=[b0, b1]} +type Replica struct { + Name string // the value specified by replica label + Labels map[string]string // includes resolution and global configs + Blocks []*metadata.Meta // the underlying blocks +} + +func NewReplica(name string, labels map[string]string, blocks []*metadata.Meta) *Replica { + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].MinTime < blocks[j].MinTime + }) + return &Replica{ + Name: name, + Labels: labels, + Blocks: blocks, + } +} + +func (r *Replica) MinTime() int64 { + if len(r.Blocks) == 0 { + return -1 + } + return r.Blocks[0].MinTime +} + +func (r *Replica) MaxTime() int64 { + if len(r.Blocks) == 0 { + return -1 + } + return r.Blocks[len(r.Blocks)-1].MaxTime +} + +func (r *Replica) Group() string { + return replicaGroup(r.Labels) +} + +func (r *Replica) isAggReplica() bool { + return r.Name == AggReplicaLabel +} + +func replicaGroup(labels map[string]string) string { + names := make([]string, 0, len(labels)) + for k := range labels { + names = append(names, k) + } + sort.Slice(names, func(i, j int) bool { + return names[i] < names[j] + }) + var b strings.Builder + b.WriteString("[") + for i, name := range names { + if i != 0 { + b.WriteString(",") + } + b.WriteString(fmt.Sprintf("%s=%s", name, labels[name])) + } + b.WriteString("]") + return b.String() +} + +type Replicas []*Replica + +// Group blocks by their global configs and resolution into different replicas. +func NewReplicas(replicaLabelName string, blocks []*metadata.Meta) (Replicas, error) { + m := make(map[string]map[string][]*metadata.Meta, 0) + groupLabels := make(map[string]map[string]string) + for _, b := range blocks { + name, ok := b.Thanos.Labels[replicaLabelName] + if !ok { + return nil, errors.Errorf("not found replica label '%s' on block: %s", replicaLabelName, b.ULID.String()) + } + labels := replicaLabels(replicaLabelName, b) + group := replicaGroup(labels) + groupLabels[group] = labels + if _, ok := m[group]; !ok { + m[group] = make(map[string][]*metadata.Meta, 0) + } + m[group][name] = append(m[group][name], b) + } + replicas := make(Replicas, 0) + for group, v := range m { + for name, blocks := range v { + replicas = append(replicas, NewReplica(name, groupLabels[group], blocks)) + } + } + return replicas, nil +} + +func replicaLabels(replicaLabelName string, b *metadata.Meta) map[string]string { + labels := make(map[string]string) + for k, v := range b.Thanos.Labels { + if k == replicaLabelName { + continue + } + labels[k] = v + } + labels[ResolutionLabel] = fmt.Sprint(b.Thanos.Downsample.Resolution) + return labels +} + +type ReplicaSyncer struct { + logger log.Logger + metrics *DedupMetrics + bkt objstore.Bucket + consistencyDelay time.Duration + blockSyncConcurrency int + + labelName string + mtx sync.Mutex + blocksMtx sync.Mutex +} + +func NewReplicaSyncer(logger log.Logger, metrics *DedupMetrics, bkt objstore.Bucket, labelName string, + consistencyDelay time.Duration, blockSyncConcurrency int) *ReplicaSyncer { + return &ReplicaSyncer{ + logger: logger, + metrics: metrics, + bkt: bkt, + labelName: labelName, + consistencyDelay: consistencyDelay, + blockSyncConcurrency: blockSyncConcurrency, + } +} + +func (s *ReplicaSyncer) Sync(ctx context.Context) (Replicas, error) { + s.mtx.Lock() + defer s.mtx.Unlock() + + var wg sync.WaitGroup + defer wg.Wait() + + blocks := make(map[ulid.ULID]*metadata.Meta) + metaIdsChan := make(chan ulid.ULID) + errChan := make(chan error, s.blockSyncConcurrency) + + syncCtx, cancel := context.WithCancel(ctx) + defer cancel() + + for i := 0; i < s.blockSyncConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for id := range metaIdsChan { + s.blocksMtx.Lock() + _, seen := blocks[id] + s.blocksMtx.Unlock() + if seen { + continue + } + s.metrics.syncMetas.WithLabelValues(s.bkt.Name()).Inc() + begin := time.Now() + meta, err := s.download(syncCtx, id) + s.metrics.syncMetaDuration.WithLabelValues(s.bkt.Name()).Observe(time.Since(begin).Seconds()) + if err != nil { + errChan <- err + s.metrics.syncMetaFailures.WithLabelValues(s.bkt.Name(), id.String()).Inc() + return + } + if meta == nil { + continue + } + s.blocksMtx.Lock() + blocks[id] = meta + s.blocksMtx.Unlock() + } + }() + } + + remote := map[ulid.ULID]struct{}{} + + err := s.bkt.Iter(ctx, "", func(name string) error { + id, ok := block.IsBlockDir(name) + if !ok { + return nil + } + + remote[id] = struct{}{} + + select { + case <-ctx.Done(): + case metaIdsChan <- id: + } + return nil + }) + + close(metaIdsChan) + + if err != nil { + return nil, compact.Retry(errors.Wrapf(err, "sync block metas from bucket %s", s.bkt.Name())) + } + + wg.Wait() + close(errChan) + + if err := <-errChan; err != nil { + return nil, compact.Retry(errors.Wrapf(err, "download block metas from bucket %s", s.bkt.Name())) + } + + result := make([]*metadata.Meta, 0) + for id, b := range blocks { + if _, ok := remote[id]; ok { + result = append(result, b) + } + } + return NewReplicas(s.labelName, result) +} + +func (s *ReplicaSyncer) download(ctx context.Context, id ulid.ULID) (*metadata.Meta, error) { + meta, err := block.DownloadMeta(ctx, s.logger, s.bkt, id) + if err != nil { + s.metrics.operateRemoteStorageFailures.WithLabelValues("get", s.bkt.Name(), id.String()).Inc() + return nil, compact.Retry(errors.Wrapf(err, "downloading block meta %s", id)) + } + if ulid.Now()-id.Time() < uint64(s.consistencyDelay/time.Millisecond) { + level.Debug(s.logger).Log("msg", "block is too fresh for now", "consistency-delay", s.consistencyDelay, "block", id) + return nil, nil + } + level.Debug(s.logger).Log("msg", "downloaded block meta", "block", id) + return &meta, nil +} diff --git a/pkg/compact/dedup/replica_test.go b/pkg/compact/dedup/replica_test.go new file mode 100644 index 00000000000..88f3f661e6d --- /dev/null +++ b/pkg/compact/dedup/replica_test.go @@ -0,0 +1,137 @@ +package dedup + +import ( + "context" + "io/ioutil" + "math/rand" + "path/filepath" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/block/metadata" + "github.com/improbable-eng/thanos/pkg/objstore" + "github.com/improbable-eng/thanos/pkg/objstore/inmem" + "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/labels" +) + +func TestNewReplicas(t *testing.T) { + blocks := make([]*metadata.Meta, 0) + blocks = append(blocks, mockMeta(t, "s0", "r0", 0, 100, 200)) + blocks = append(blocks, mockMeta(t, "s0", "r0", 0, 200, 300)) + blocks = append(blocks, mockMeta(t, "s1", "r0", 0, 100, 200)) + blocks = append(blocks, mockMeta(t, "s0", "r0", 1, 100, 200)) + replicas, err := NewReplicas("replica", blocks) + testutil.Ok(t, err) + testutil.Assert(t, len(replicas) == 3, "new replicas failed") + for _, r := range replicas { + switch group := r.Group(); group { + case "[resolution=0,shard=s0]": + testutil.Assert(t, len(r.Blocks) == 2, "new replicas failed") + testutil.Assert(t, r.Name == "r0", "new replicas failed") + testutil.Assert(t, r.MinTime() == 100, "new replicas failed") + testutil.Assert(t, r.MaxTime() == 300, "new replicas failed") + case "[resolution=0,shard=s1]": + testutil.Assert(t, len(r.Blocks) == 1, "new replicas failed") + testutil.Assert(t, r.Name == "r0", "new replicas failed") + testutil.Assert(t, r.MinTime() == 100, "new replicas failed") + testutil.Assert(t, r.MaxTime() == 200, "new replicas failed") + default: + testutil.Assert(t, len(r.Blocks) == 1, "new replicas failed") + testutil.Assert(t, r.Name == "r0", "new replicas failed") + testutil.Assert(t, r.MinTime() == 100, "new replicas failed") + testutil.Assert(t, r.MaxTime() == 200, "new replicas failed") + } + for i := range r.Blocks { + if i == 0 { + continue + } + testutil.Assert(t, r.Blocks[i].MinTime > r.Blocks[i - 1].MinTime, "new replicas failed") + } + } +} + +func TestReplicaSyncer_Sync(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + bkt := mockObjectStoreBucket(t, ctx, logger) + + syncer := NewReplicaSyncer(logger, NewDedupMetrics(reg), bkt, "replica", 0, 1) + + replicas, err := syncer.Sync(ctx) + testutil.Ok(t, err) + testutil.Assert(t, len(replicas) == 2, "replica syncer failed") + testutil.Assert(t, len(replicas[0].Blocks) == 1, "replica syncer failed") + testutil.Assert(t, len(replicas[1].Blocks) == 1, "replica syncer failed") +} + +func mockObjectStoreBucket(t *testing.T, ctx context.Context, logger log.Logger) objstore.Bucket { + dataDir, err := ioutil.TempDir("", "thanos-dedup-bucket") + testutil.Ok(t, err) + bkt := inmem.NewBucket() + + id0 := createBlock(t, ctx, dataDir, "r0") + err = objstore.UploadDir(ctx, logger, bkt, filepath.Join(dataDir, id0.String()), id0.String()) + testutil.Ok(t, err) + + id1 := createBlock(t, ctx, dataDir, "r1") + err = objstore.UploadDir(ctx, logger, bkt, filepath.Join(dataDir, id1.String()), id1.String()) + testutil.Ok(t, err) + + return bkt +} + +func createBlock(t *testing.T, ctx context.Context, dataDir string, replica string) ulid.ULID { + globalConfigs := make(map[string]string) + globalConfigs["shard"] = "s0" + globalConfigs["replica"] = replica + + var lset []labels.Labels + if replica == "r0" { + lset = []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "b", Value: "1"}}, + } + } else { + lset = []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "b", Value: "1"}}, + {{Name: "b", Value: "2"}}, + } + } + id, err := testutil.CreateBlock(ctx, dataDir, lset, 100, 0, 1000, labels.FromMap(globalConfigs), 0) + testutil.Ok(t, err) + return id +} + +func mockMeta(t *testing.T, shard, replica string, resolution, minTime, maxTime int64) *metadata.Meta { + meta := metadata.Meta{} + meta.Version = metadata.MetaVersion1 + meta.BlockMeta = tsdb.BlockMeta{ + ULID: ulid.MustNew(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano()))), + MinTime: minTime, + MaxTime: maxTime, + } + labels := make(map[string]string) + labels["shard"] = shard + labels["replica"] = replica + meta.Thanos = metadata.Thanos{ + Labels: labels, + Downsample: metadata.ThanosDownsample{ + Resolution: resolution, + }, + Source: metadata.TestSource, + } + return &meta +} diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index e7b09437ecb..19439792fff 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -75,10 +75,15 @@ func Downsample( newMeta := *origMeta newMeta.Thanos.Downsample.Resolution = resolution newMeta.ULID = uid + newMeta.Thanos.Source = metadata.CompactorSource // Writes downsampled chunks right into the files, avoiding excess memory allocation. // Flushes index and meta data after aggregations. - streamedBlockWriter, err := NewStreamedBlockWriter(blockDir, indexr, logger, newMeta) + symbols, err := indexr.Symbols() + if err != nil { + return id, errors.Wrap(err, "get index symbols") + } + streamedBlockWriter, err := NewStreamedBlockWriter(blockDir, symbols, logger, newMeta) if err != nil { return id, errors.Wrap(err, "get streamed block writer") } diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go index e6cf8c878e0..d0e782e4f65 100644 --- a/pkg/compact/downsample/streamed_block_writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -58,7 +58,6 @@ type streamedBlockWriter struct { chunkWriter tsdb.ChunkWriter indexWriter tsdb.IndexWriter - indexReader tsdb.IndexReader closers []io.Closer labelsValues labelsValues // labelsValues list of used label sets: name -> []values. @@ -75,7 +74,7 @@ type streamedBlockWriter struct { // exception, not a general case. func NewStreamedBlockWriter( blockDir string, - indexReader tsdb.IndexReader, + symbols map[string]struct{}, logger log.Logger, originMeta metadata.Meta, ) (w *streamedBlockWriter, err error) { @@ -105,11 +104,6 @@ func NewStreamedBlockWriter( } closers = append(closers, indexWriter) - symbols, err := indexReader.Symbols() - if err != nil { - return nil, errors.Wrap(err, "read symbols") - } - err = indexWriter.AddSymbols(symbols) if err != nil { return nil, errors.Wrap(err, "add symbols") @@ -118,7 +112,6 @@ func NewStreamedBlockWriter( return &streamedBlockWriter{ logger: logger, blockDir: blockDir, - indexReader: indexReader, indexWriter: indexWriter, chunkWriter: chunkWriter, meta: originMeta, @@ -217,7 +210,7 @@ func (w *streamedBlockWriter) Close() error { // No error, claim success. level.Info(w.logger).Log( - "msg", "finalized downsampled block", + "msg", "write block", "mint", w.meta.MinTime, "maxt", w.meta.MaxTime, "ulid", w.meta.ULID, @@ -269,7 +262,7 @@ func (w *streamedBlockWriter) writeMemPostings() error { // writeMetaFile writes meta file. func (w *streamedBlockWriter) writeMetaFile() error { w.meta.Version = metadata.MetaVersion1 - w.meta.Thanos.Source = metadata.CompactorSource + w.meta.Stats.NumChunks = w.totalChunks w.meta.Stats.NumSamples = w.totalSamples w.meta.Stats.NumSeries = w.postings diff --git a/pkg/query/iter.go b/pkg/query/iter.go index 8e54d61e683..5cc6512278c 100644 --- a/pkg/query/iter.go +++ b/pkg/query/iter.go @@ -388,7 +388,7 @@ func (s *dedupSeries) Labels() labels.Labels { func (s *dedupSeries) Iterator() (it storage.SeriesIterator) { it = s.replicas[0].Iterator() for _, o := range s.replicas[1:] { - it = newDedupSeriesIterator(it, o.Iterator()) + it = NewDedupSeriesIterator(it, o.Iterator()) } return it } @@ -403,7 +403,7 @@ type dedupSeriesIterator struct { useA bool } -func newDedupSeriesIterator(a, b storage.SeriesIterator) *dedupSeriesIterator { +func NewDedupSeriesIterator(a, b storage.SeriesIterator) *dedupSeriesIterator { return &dedupSeriesIterator{ a: a, b: b, diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 9a7999e3848..ac053eb7454 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -284,7 +284,7 @@ func TestDedupSeriesIterator(t *testing.T) { } for i, c := range cases { t.Logf("case %d:", i) - it := newDedupSeriesIterator( + it := NewDedupSeriesIterator( &SampleIterator{l: c.a, i: -1}, &SampleIterator{l: c.b, i: -1}, ) @@ -295,7 +295,7 @@ func TestDedupSeriesIterator(t *testing.T) { func BenchmarkDedupSeriesIterator(b *testing.B) { run := func(b *testing.B, s1, s2 []sample) { - it := newDedupSeriesIterator( + it := NewDedupSeriesIterator( &SampleIterator{l: s1, i: -1}, &SampleIterator{l: s2, i: -1}, )