-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
shuaizhang
committed
Jun 24, 2019
1 parent
38a9da0
commit dd8efef
Showing
18 changed files
with
1,696 additions
and
29 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"path/filepath" | ||
"time" | ||
|
||
"github.com/go-kit/kit/log" | ||
"github.com/improbable-eng/thanos/pkg/compact/dedup" | ||
"github.com/improbable-eng/thanos/pkg/objstore/client" | ||
"github.com/improbable-eng/thanos/pkg/runutil" | ||
"github.com/oklog/run" | ||
"github.com/opentracing/opentracing-go" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"gopkg.in/alecthomas/kingpin.v2" | ||
) | ||
|
||
func registerDedup(m map[string]setupFunc, app *kingpin.Application, name string) { | ||
cmd := app.Command(name, "continuously dedup blocks in an object store bucket") | ||
|
||
dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process deduplication."). | ||
Default("./data").String() | ||
|
||
replicaLabel := cmd.Flag("dedup.replica-label", "Label to treat as a replica indicator along which data is deduplicated.").Required(). | ||
String() | ||
|
||
consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of fresh (non-dedup) blocks before they are being processed."). | ||
Default("30m")) | ||
|
||
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage."). | ||
Default("20").Int() | ||
|
||
objStoreConfig := regCommonObjStoreFlags(cmd, "", true) | ||
|
||
m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { | ||
return runDedup(g, logger, reg, *dataDir, *replicaLabel, time.Duration(*consistencyDelay), *blockSyncConcurrency, objStoreConfig, name) | ||
} | ||
} | ||
|
||
func runDedup(g *run.Group, logger log.Logger, reg *prometheus.Registry, dataDir string, replicaLabel string, | ||
consistencyDelay time.Duration, blockSyncConcurrency int, objStoreConfig *pathOrContent, component string) error { | ||
confContentYaml, err := objStoreConfig.Content() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
bkt, err := client.NewBucket(logger, confContentYaml, reg, component) | ||
if err != nil { | ||
if bkt != nil { | ||
runutil.CloseWithLogOnErr(logger, bkt, "bucket client") | ||
} | ||
return err | ||
} | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
dedupDir := filepath.Join(dataDir, "dedup") | ||
g.Add(func() error { | ||
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") | ||
|
||
deduper := dedup.NewBucketDeduper(logger, reg, bkt, dedupDir, replicaLabel, consistencyDelay, blockSyncConcurrency) | ||
return deduper.Dedup(ctx) | ||
}, func(error) { | ||
cancel() | ||
}) | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
package dedup | ||
|
||
import ( | ||
"context" | ||
"os" | ||
"time" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
|
||
"github.com/go-kit/kit/log" | ||
"github.com/go-kit/kit/log/level" | ||
"github.com/improbable-eng/thanos/pkg/objstore" | ||
"github.com/pkg/errors" | ||
) | ||
|
||
type BucketDeduper struct { | ||
logger log.Logger | ||
dedupDir string | ||
replicaLabelName string | ||
bkt objstore.Bucket | ||
|
||
metrics *DedupMetrics | ||
|
||
syncer *ReplicaSyncer | ||
merger *ReplicaMerger | ||
} | ||
|
||
func NewBucketDeduper(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, dedupDir, replicaLabelName string, | ||
consistencyDelay time.Duration, blockSyncConcurrency int) *BucketDeduper { | ||
metrics := NewDedupMetrics(reg) | ||
return &BucketDeduper{ | ||
logger: logger, | ||
dedupDir: dedupDir, | ||
replicaLabelName: replicaLabelName, | ||
bkt: bkt, | ||
metrics: metrics, | ||
syncer: NewReplicaSyncer(logger, metrics, bkt, replicaLabelName, consistencyDelay, blockSyncConcurrency), | ||
merger: NewReplicaMerger(logger, metrics, bkt, dedupDir, replicaLabelName), | ||
} | ||
} | ||
|
||
func (d *BucketDeduper) Dedup(ctx context.Context) error { | ||
level.Info(d.logger).Log("msg", "start of deduplication") | ||
start := time.Now() | ||
if err := os.RemoveAll(d.dedupDir); err != nil { | ||
return errors.Wrap(err, "clean up the dedup temporary directory") | ||
} | ||
if err := os.MkdirAll(d.dedupDir, 0777); err != nil { | ||
return errors.Wrap(err, "create the dedup temporary directory") | ||
} | ||
|
||
replicas, err := d.syncer.Sync(ctx) | ||
if err != nil { | ||
return errors.Wrap(err, "sync replica metas") | ||
} | ||
|
||
groups := make(map[string]Replicas) | ||
for _, r := range replicas { | ||
group := r.Group() | ||
groups[group] = append(groups[group], r) | ||
} | ||
for k, v := range groups { | ||
level.Info(d.logger).Log("msg", "starting to dedup replicas", "group", k) | ||
d.metrics.deduplication.WithLabelValues(d.bkt.Name()).Inc() | ||
if err := d.merger.Merge(ctx, v); err != nil { | ||
d.metrics.deduplicationFailures.WithLabelValues(d.bkt.Name(), k).Inc() | ||
return errors.Wrapf(err, "merge replicas: %s", k) | ||
} | ||
level.Info(d.logger).Log("msg", "completed to dedup replicas", "group", k) | ||
} | ||
level.Info(d.logger).Log("msg", "deduplication process done", "duration", time.Since(start)) | ||
return nil | ||
} |
Oops, something went wrong.