Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compact: Offline deduplication #1275

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#1253](https://github.com/improbable-eng/thanos/pull/1253) Add support for specifying a maximum amount of retries when using Azure Blob storage (default: no retries).

- [#1275](https://github.com/improbable-eng/thanos/pull/1275) Added `offline dudeplication` function in Thanos compactor.

## [v0.5.0](https://github.com/improbable-eng/thanos/releases/tag/v0.5.0) - 2019.06.05

TL;DR: Store LRU cache is no longer leaking, Upgraded Thanos UI to Prometheus 2.9, Fixed auto-downsampling, Moved to Go 1.12.5 and more.
Expand Down
20 changes: 20 additions & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/compact"
"github.com/improbable-eng/thanos/pkg/compact/dedup"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/client"
Expand Down Expand Up @@ -110,6 +111,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").Int()

enableDedup := cmd.Flag("enable-dedup", "Enable dedup function, but effect depends on 'dedup.replica-label' config").Default("false").Bool()
dedupReplicaLabel := cmd.Flag("dedup.replica-label", "Label to treat as a replica indicator along which data is deduplicated.").String()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
Expand All @@ -130,6 +134,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
*maxCompactionLevel,
*blockSyncConcurrency,
*compactionConcurrency,
*enableDedup,
*dedupReplicaLabel,
)
}
}
Expand All @@ -152,6 +158,8 @@ func runCompact(
maxCompactionLevel int,
blockSyncConcurrency int,
concurrency int,
enableDedup bool,
dedupReplicaLabel string,
) error {
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand Down Expand Up @@ -210,6 +218,7 @@ func runCompact(
}

var (
dedupDir = path.Join(dataDir, "dedup")
compactDir = path.Join(dataDir, "compact")
downsamplingDir = path.Join(dataDir, "downsample")
indexCacheDir = path.Join(dataDir, "index_cache")
Expand All @@ -234,7 +243,14 @@ func runCompact(
level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h])
}

deduper := dedup.NewBucketDeduper(logger, reg, bkt, dedupDir, dedupReplicaLabel, consistencyDelay, blockSyncConcurrency)

f := func() error {
if isEnableDedup(enableDedup, dedupReplicaLabel) {
if err := deduper.Dedup(ctx); err != nil {
return errors.Wrap(err, "dedup failed")
}
}
if err := compactor.Compact(ctx); err != nil {
return errors.Wrap(err, "compaction failed")
}
Expand Down Expand Up @@ -443,3 +459,7 @@ func generateIndexCacheFile(
}
return nil
}

func isEnableDedup(enableDedup bool, dedupReplicaLabel string) bool {
return enableDedup && len(dedupReplicaLabel) > 0
}
66 changes: 66 additions & 0 deletions cmd/thanos/dedup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package main

import (
"context"
"path/filepath"
"time"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/compact/dedup"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/alecthomas/kingpin.v2"
)

func registerDedup(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "continuously dedup blocks in an object store bucket")

dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process deduplication.").
Default("./data").String()

replicaLabel := cmd.Flag("dedup.replica-label", "Label to treat as a replica indicator along which data is deduplicated.").Required().
String()

consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of fresh (non-dedup) blocks before they are being processed.").
Default("30m"))

blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage.").
Default("20").Int()

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runDedup(g, logger, reg, *dataDir, *replicaLabel, time.Duration(*consistencyDelay), *blockSyncConcurrency, objStoreConfig, name)
}
}

func runDedup(g *run.Group, logger log.Logger, reg *prometheus.Registry, dataDir string, replicaLabel string,
consistencyDelay time.Duration, blockSyncConcurrency int, objStoreConfig *pathOrContent, component string) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component)
if err != nil {
if bkt != nil {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}
return err
}

ctx, cancel := context.WithCancel(context.Background())
dedupDir := filepath.Join(dataDir, "dedup")
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

deduper := dedup.NewBucketDeduper(logger, reg, bkt, dedupDir, replicaLabel, consistencyDelay, blockSyncConcurrency)
return deduper.Dedup(ctx)
}, func(error) {
cancel()
})
return nil
}
1 change: 1 addition & 0 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func main() {
registerDownsample(cmds, app, "downsample")
registerReceive(cmds, app, "receive")
registerChecks(cmds, app, "check")
registerDedup(cmds, app, "dedup")

cmd, err := app.Parse(os.Args[1:])
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,10 @@ Flags:
metadata from object storage.
--compact.concurrency=1 Number of goroutines to use when compacting
groups.
--enable-dedup Enable dedup function, but effect depends on
'dedup.replica-label' config
--dedup.replica-label=DEDUP.REPLICA-LABEL
Label to treat as a replica indicator along which
data is deduplicated.

```
18 changes: 9 additions & 9 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,14 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
})
close(metaIDsChan)
if err != nil {
return retry(errors.Wrap(err, "retrieve bucket block metas"))
return Retry(errors.Wrap(err, "retrieve bucket block metas"))
}

wg.Wait()
close(errChan)

if err := <-errChan; err != nil {
return retry(err)
return Retry(err)
}

// Delete all local block dirs that no longer exist in the bucket.
Expand Down Expand Up @@ -459,7 +459,7 @@ func (c *Syncer) garbageCollect(ctx context.Context, resolution int64) error {
err := block.Delete(delCtx, c.bkt, id)
cancel()
if err != nil {
return retry(errors.Wrapf(err, "delete block %s from bucket", id))
return Retry(errors.Wrapf(err, "delete block %s from bucket", id))
}

// Immediately update our in-memory state so no further call to SyncMetas is needed
Expand Down Expand Up @@ -633,7 +633,7 @@ type RetryError struct {
err error
}

func retry(err error) error {
func Retry(err error) error {
if IsHaltError(err) {
return err
}
Expand Down Expand Up @@ -716,7 +716,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,

bdir := filepath.Join(tmpdir, ie.id.String())
if err := block.Download(ctx, logger, bkt, ie.id, bdir); err != nil {
return retry(errors.Wrapf(err, "download block %s", ie.id))
return Retry(errors.Wrapf(err, "download block %s", ie.id))
}

meta, err := metadata.Read(bdir)
Expand All @@ -736,7 +736,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,

level.Info(logger).Log("msg", "uploading repaired block", "newID", resid)
if err = block.Upload(ctx, logger, bkt, filepath.Join(tmpdir, resid.String())); err != nil {
return retry(errors.Wrapf(err, "upload of %s failed", resid))
return Retry(errors.Wrapf(err, "upload of %s failed", resid))
}

level.Info(logger).Log("msg", "deleting broken block", "id", ie.id)
Expand Down Expand Up @@ -818,7 +818,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
}

if err := block.Download(ctx, cg.logger, cg.bkt, id, pdir); err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", id))
return false, ulid.ULID{}, Retry(errors.Wrapf(err, "download block %s", id))
}

// Ensure all input blocks are valid.
Expand Down Expand Up @@ -904,7 +904,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
begin = time.Now()

if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID))
return false, ulid.ULID{}, Retry(errors.Wrapf(err, "upload of %s failed", compID))
}
level.Debug(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin))

Expand All @@ -913,7 +913,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
// Eventually the block we just uploaded should get synced into the group again (including sync-delay).
for _, b := range plan {
if err := cg.deleteBlock(b); err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "delete old block from bucket"))
return false, ulid.ULID{}, Retry(errors.Wrapf(err, "delete old block from bucket"))
}
cg.groupGarbageCollectedBlocks.Inc()
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestHaltMultiError(t *testing.T) {
}

func TestRetryMultiError(t *testing.T) {
retryErr := retry(errors.New("retry error"))
retryErr := Retry(errors.New("retry error"))
nonRetryErr := errors.New("not a retry error")

errs := terrors.MultiError{nonRetryErr}
Expand All @@ -57,16 +57,16 @@ func TestRetryError(t *testing.T) {
err := errors.New("test")
testutil.Assert(t, !IsRetryError(err), "retry error")

err = retry(errors.New("test"))
err = Retry(errors.New("test"))
testutil.Assert(t, IsRetryError(err), "not a retry error")

err = errors.Wrap(retry(errors.New("test")), "something")
err = errors.Wrap(Retry(errors.New("test")), "something")
testutil.Assert(t, IsRetryError(err), "not a retry error")

err = errors.Wrap(errors.Wrap(retry(errors.New("test")), "something"), "something2")
err = errors.Wrap(errors.Wrap(Retry(errors.New("test")), "something"), "something2")
testutil.Assert(t, IsRetryError(err), "not a retry error")

err = errors.Wrap(retry(errors.Wrap(halt(errors.New("test")), "something")), "something2")
err = errors.Wrap(Retry(errors.Wrap(halt(errors.New("test")), "something")), "something2")
testutil.Assert(t, IsHaltError(err), "not a halt error. Retry should not hide halt error")
}

Expand Down
73 changes: 73 additions & 0 deletions pkg/compact/dedup/dedup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package dedup

import (
"context"
"os"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/pkg/errors"
)

type BucketDeduper struct {
logger log.Logger
dedupDir string
replicaLabelName string
bkt objstore.Bucket

metrics *DedupMetrics

syncer *ReplicaSyncer
merger *ReplicaMerger
}

func NewBucketDeduper(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, dedupDir, replicaLabelName string,
consistencyDelay time.Duration, blockSyncConcurrency int) *BucketDeduper {
metrics := NewDedupMetrics(reg)
return &BucketDeduper{
logger: logger,
dedupDir: dedupDir,
replicaLabelName: replicaLabelName,
bkt: bkt,
metrics: metrics,
syncer: NewReplicaSyncer(logger, metrics, bkt, replicaLabelName, consistencyDelay, blockSyncConcurrency),
merger: NewReplicaMerger(logger, metrics, bkt, dedupDir, replicaLabelName),
}
}

func (d *BucketDeduper) Dedup(ctx context.Context) error {
level.Info(d.logger).Log("msg", "start of deduplication")
start := time.Now()
if err := os.RemoveAll(d.dedupDir); err != nil {
return errors.Wrap(err, "clean up the dedup temporary directory")
}
if err := os.MkdirAll(d.dedupDir, 0777); err != nil {
return errors.Wrap(err, "create the dedup temporary directory")
}

replicas, err := d.syncer.Sync(ctx)
if err != nil {
return errors.Wrap(err, "sync replica metas")
}

groups := make(map[string]Replicas)
for _, r := range replicas {
group := r.Group()
groups[group] = append(groups[group], r)
}
for k, v := range groups {
level.Info(d.logger).Log("msg", "starting to dedup replicas", "group", k)
d.metrics.deduplication.WithLabelValues(d.bkt.Name()).Inc()
if err := d.merger.Merge(ctx, v); err != nil {
d.metrics.deduplicationFailures.WithLabelValues(d.bkt.Name(), k).Inc()
return errors.Wrapf(err, "merge replicas: %s", k)
}
level.Info(d.logger).Log("msg", "completed to dedup replicas", "group", k)
}
level.Info(d.logger).Log("msg", "deduplication process done", "duration", time.Since(start))
return nil
}
Loading