Skip to content

Commit

Permalink
add dedup function in compactor
Browse files Browse the repository at this point in the history
  • Loading branch information
shuaizhang committed Jun 20, 2019
1 parent 38a9da0 commit 7cb350b
Show file tree
Hide file tree
Showing 18 changed files with 1,695 additions and 28 deletions.
20 changes: 20 additions & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/compact"
"github.com/improbable-eng/thanos/pkg/compact/dedup"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/client"
Expand Down Expand Up @@ -110,6 +111,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").Int()

enableDedup := cmd.Flag("enable-dedup", "Enable dedup function, but effect depends on 'dedup.replica-label' config").Default("false").Bool()
dedupReplicaLabel := cmd.Flag("dedup.replica-label", "Label to treat as a replica indicator along which data is deduplicated.").String()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
Expand All @@ -130,6 +134,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
*maxCompactionLevel,
*blockSyncConcurrency,
*compactionConcurrency,
*enableDedup,
*dedupReplicaLabel,
)
}
}
Expand All @@ -152,6 +158,8 @@ func runCompact(
maxCompactionLevel int,
blockSyncConcurrency int,
concurrency int,
enableDedup bool,
dedupReplicaLabel string,
) error {
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand Down Expand Up @@ -210,6 +218,7 @@ func runCompact(
}

var (
dedupDir = path.Join(dataDir, "dedup")
compactDir = path.Join(dataDir, "compact")
downsamplingDir = path.Join(dataDir, "downsample")
indexCacheDir = path.Join(dataDir, "index_cache")
Expand All @@ -234,7 +243,14 @@ func runCompact(
level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h])
}

deduper := dedup.NewBucketDeduper(logger, reg, bkt, dedupDir, dedupReplicaLabel, consistencyDelay, blockSyncConcurrency)

f := func() error {
if isEnableDedup(enableDedup, dedupReplicaLabel) {
if err := deduper.Dedup(ctx); err != nil {
return errors.Wrap(err, "dedup failed")
}
}
if err := compactor.Compact(ctx); err != nil {
return errors.Wrap(err, "compaction failed")
}
Expand Down Expand Up @@ -443,3 +459,7 @@ func generateIndexCacheFile(
}
return nil
}

func isEnableDedup(enableDedup bool, dedupReplicaLabel string) bool {
return enableDedup && len(dedupReplicaLabel) > 0
}
66 changes: 66 additions & 0 deletions cmd/thanos/dedup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package main

import (
"context"
"path/filepath"
"time"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/compact/dedup"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/alecthomas/kingpin.v2"
)

func registerDedup(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "continuously dedup blocks in an object store bucket")

dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process deduplication.").
Default("./data").String()

replicaLabel := cmd.Flag("dedup.replica-label", "Label to treat as a replica indicator along which data is deduplicated.").Required().
String()

consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of fresh (non-dedup) blocks before they are being processed.").
Default("30m"))

blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage.").
Default("20").Int()

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runDedup(g, logger, reg, *dataDir, *replicaLabel, time.Duration(*consistencyDelay), *blockSyncConcurrency, objStoreConfig, name)
}
}

func runDedup(g *run.Group, logger log.Logger, reg *prometheus.Registry, dataDir string, replicaLabel string,
consistencyDelay time.Duration, blockSyncConcurrency int, objStoreConfig *pathOrContent, component string) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component)
if err != nil {
if bkt != nil {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}
return err
}

ctx, cancel := context.WithCancel(context.Background())
dedupDir := filepath.Join(dataDir, "dedup")
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

deduper := dedup.NewBucketDeduper(logger, reg, bkt, dedupDir, replicaLabel, consistencyDelay, blockSyncConcurrency)
return deduper.Dedup(ctx)
}, func(error) {
cancel()
})
return nil
}
1 change: 1 addition & 0 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func main() {
registerDownsample(cmds, app, "downsample")
registerReceive(cmds, app, "receive")
registerChecks(cmds, app, "check")
registerDedup(cmds, app, "dedup")

cmd, err := app.Parse(os.Args[1:])
if err != nil {
Expand Down
18 changes: 9 additions & 9 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,14 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
})
close(metaIDsChan)
if err != nil {
return retry(errors.Wrap(err, "retrieve bucket block metas"))
return Retry(errors.Wrap(err, "retrieve bucket block metas"))
}

wg.Wait()
close(errChan)

if err := <-errChan; err != nil {
return retry(err)
return Retry(err)
}

// Delete all local block dirs that no longer exist in the bucket.
Expand Down Expand Up @@ -459,7 +459,7 @@ func (c *Syncer) garbageCollect(ctx context.Context, resolution int64) error {
err := block.Delete(delCtx, c.bkt, id)
cancel()
if err != nil {
return retry(errors.Wrapf(err, "delete block %s from bucket", id))
return Retry(errors.Wrapf(err, "delete block %s from bucket", id))
}

// Immediately update our in-memory state so no further call to SyncMetas is needed
Expand Down Expand Up @@ -633,7 +633,7 @@ type RetryError struct {
err error
}

func retry(err error) error {
func Retry(err error) error {
if IsHaltError(err) {
return err
}
Expand Down Expand Up @@ -716,7 +716,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,

bdir := filepath.Join(tmpdir, ie.id.String())
if err := block.Download(ctx, logger, bkt, ie.id, bdir); err != nil {
return retry(errors.Wrapf(err, "download block %s", ie.id))
return Retry(errors.Wrapf(err, "download block %s", ie.id))
}

meta, err := metadata.Read(bdir)
Expand All @@ -736,7 +736,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,

level.Info(logger).Log("msg", "uploading repaired block", "newID", resid)
if err = block.Upload(ctx, logger, bkt, filepath.Join(tmpdir, resid.String())); err != nil {
return retry(errors.Wrapf(err, "upload of %s failed", resid))
return Retry(errors.Wrapf(err, "upload of %s failed", resid))
}

level.Info(logger).Log("msg", "deleting broken block", "id", ie.id)
Expand Down Expand Up @@ -818,7 +818,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
}

if err := block.Download(ctx, cg.logger, cg.bkt, id, pdir); err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", id))
return false, ulid.ULID{}, Retry(errors.Wrapf(err, "download block %s", id))
}

// Ensure all input blocks are valid.
Expand Down Expand Up @@ -904,7 +904,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
begin = time.Now()

if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID))
return false, ulid.ULID{}, Retry(errors.Wrapf(err, "upload of %s failed", compID))
}
level.Debug(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin))

Expand All @@ -913,7 +913,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
// Eventually the block we just uploaded should get synced into the group again (including sync-delay).
for _, b := range plan {
if err := cg.deleteBlock(b); err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "delete old block from bucket"))
return false, ulid.ULID{}, Retry(errors.Wrapf(err, "delete old block from bucket"))
}
cg.groupGarbageCollectedBlocks.Inc()
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ func TestRetryError(t *testing.T) {
err := errors.New("test")
testutil.Assert(t, !IsRetryError(err), "retry error")

err = retry(errors.New("test"))
err = Retry(errors.New("test"))
testutil.Assert(t, IsRetryError(err), "not a retry error")

err = errors.Wrap(retry(errors.New("test")), "something")
err = errors.Wrap(Retry(errors.New("test")), "something")
testutil.Assert(t, IsRetryError(err), "not a retry error")

err = errors.Wrap(errors.Wrap(retry(errors.New("test")), "something"), "something2")
err = errors.Wrap(errors.Wrap(Retry(errors.New("test")), "something"), "something2")
testutil.Assert(t, IsRetryError(err), "not a retry error")

err = errors.Wrap(retry(errors.Wrap(halt(errors.New("test")), "something")), "something2")
err = errors.Wrap(Retry(errors.Wrap(halt(errors.New("test")), "something")), "something2")
testutil.Assert(t, IsHaltError(err), "not a halt error. Retry should not hide halt error")
}

Expand Down
73 changes: 73 additions & 0 deletions pkg/compact/dedup/dedup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package dedup

import (
"context"
"os"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/pkg/errors"
)

type BucketDeduper struct {
logger log.Logger
dedupDir string
replicaLabelName string
bkt objstore.Bucket

metrics *DedupMetrics

syncer *ReplicaSyncer
merger *ReplicaMerger
}

func NewBucketDeduper(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, dedupDir, replicaLabelName string,
consistencyDelay time.Duration, blockSyncConcurrency int) *BucketDeduper {
metrics := NewDedupMetrics(reg)
return &BucketDeduper{
logger: logger,
dedupDir: dedupDir,
replicaLabelName: replicaLabelName,
bkt: bkt,
metrics: metrics,
syncer: NewReplicaSyncer(logger, metrics, bkt, replicaLabelName, consistencyDelay, blockSyncConcurrency),
merger: NewReplicaMerger(logger, metrics, bkt, dedupDir, replicaLabelName),
}
}

func (d *BucketDeduper) Dedup(ctx context.Context) error {
level.Info(d.logger).Log("msg", "start of deduplication")
start := time.Now()
if err := os.RemoveAll(d.dedupDir); err != nil {
return errors.Wrap(err, "clean up the dedup temporary directory")
}
if err := os.MkdirAll(d.dedupDir, 0777); err != nil {
return errors.Wrap(err, "create the dedup temporary directory")
}

replicas, err := d.syncer.Sync(ctx)
if err != nil {
return errors.Wrap(err, "sync replica metas")
}

groups := make(map[string]Replicas)
for _, r := range replicas {
group := r.Group()
groups[group] = append(groups[group], r)
}
for k, v := range groups {
level.Info(d.logger).Log("msg", "starting to dedup replicas", "group", k)
d.metrics.deduplication.WithLabelValues(d.bkt.Name()).Inc()
if err := d.merger.Merge(ctx, v); err != nil {
d.metrics.deduplicationFailures.WithLabelValues(d.bkt.Name(), k).Inc()
return errors.Wrapf(err, "merge replicas: %s", k)
}
level.Info(d.logger).Log("msg", "completed to dedup replicas", "group", k)
}
level.Info(d.logger).Log("msg", "deduplication process done", "duration", time.Since(start))
return nil
}
Loading

0 comments on commit 7cb350b

Please sign in to comment.