Skip to content

Commit

Permalink
cmd/thanos/compact: add bucket UI (#1714)
Browse files Browse the repository at this point in the history
This commit enhances the compact component so that it runs the bucket UI
whenever the --wait flag is also passed. In order to reduce the overhead
of running the UI in addition to the compactor, this commit also
refactors the compactor and bucket commands a bit in order to re-use a
single meta fetcher.

Signed-off-by: Lucas Servén Marín <lserven@gmail.com>
  • Loading branch information
squat authored Mar 24, 2020
1 parent 65b49af commit 4bd19b1
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 94 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#2265](https://github.com/thanos-io/thanos/pull/2265) Compactor: Add `--wait-interval` to specify compaction wait interval between consecutive compact runs when `--wait` enabled.
- [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks.
- [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process.

### Changed

Expand Down
47 changes: 22 additions & 25 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,6 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
label := cmd.Flag("label", "Prometheus label to use as timeline title").String()

m[name+" web"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
ctx, cancel := context.WithCancel(context.Background())

comp := component.Bucket
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
Expand Down Expand Up @@ -365,10 +363,26 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
level.Warn(logger).Log("msg", "Refresh interval should be at least 2 times the timeout")
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Bucket.String())
if err != nil {
return errors.Wrap(err, "bucket client")
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
statusProber.Ready()

return refresh(ctx, logger, bucketUI, *interval, *timeout, name, reg, objStoreConfig)
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
return refresh(ctx, logger, bucketUI, *interval, *timeout, fetcher)
}, func(error) {
cancel()
})
Expand Down Expand Up @@ -432,7 +446,6 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na
}

func registerBucketDownsample(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *extflag.PathOrContent) {

comp := component.Downsample
cmd := root.Command(comp.String(), "continuously downsamples blocks in an object store bucket")

Expand All @@ -446,30 +459,14 @@ func registerBucketDownsample(m map[string]setupFunc, root *kingpin.CmdClause, n
}
}

// refresh metadata from remote storage periodically and update UI.
func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, name string, reg *prometheus.Registry, objStoreConfig *extflag.PathOrContent) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, name)
if err != nil {
return errors.Wrap(err, "bucket client")
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}

defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
// refresh metadata from remote storage periodically and update the UI.
func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, fetcher *block.MetaFetcher) error {
return runutil.Repeat(duration, ctx.Done(), func() error {
return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(ctx, timeout)
defer iterCancel()

blocks, err := download(iterCtx, logger, bkt, fetcher)
blocks, err := download(iterCtx, logger, fetcher)
if err != nil {
bucketUI.Set("[]", err)
return err
Expand All @@ -486,7 +483,7 @@ func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, durati
})
}

func download(ctx context.Context, logger log.Logger, bkt objstore.Bucket, fetcher *block.MetaFetcher) ([]metadata.Meta, error) {
func download(ctx context.Context, logger log.Logger, fetcher *block.MetaFetcher) ([]metadata.Meta, error) {
level.Info(logger).Log("msg", "synchronizing block metadata")

metas, _, err := fetcher.Fetch(ctx)
Expand Down
31 changes: 30 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
Expand All @@ -29,11 +30,13 @@ import (
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/ui"
"gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -107,7 +110,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
wait := cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work.").
Short('w').Bool()

waitInterval := cmd.Flag("wait-interval", "Wait interval between consecutive compaction runs. Only works when --wait flag specified.").
waitInterval := cmd.Flag("wait-interval", "Wait interval between consecutive compaction runs and bucket refreshes. Only works when --wait flag specified.").
Default("5m").Duration()

generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload.").
Expand Down Expand Up @@ -141,6 +144,15 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {

selectorRelabelConf := regSelectorRelabelFlags(cmd)

webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String()
webPrefixHeaderName := cmd.Flag("web.prefix-header", "Name of HTTP request header used for dynamic prefixing of UI links and redirects. This option is ignored if web.external-prefix argument is set. Security risk: enable this option only if a reverse proxy in front of thanos is resetting the header. The --web.prefix-header=X-Forwarded-Prefix option can be useful, for example, if Thanos UI is served via Traefik reverse proxy with PathPrefixStrip option enabled, which sends the stripped prefix value in X-Forwarded-Prefix header. This allows thanos UI to be served on a sub-path.").Default("").String()
flagsMap := map[string]string{
"web.external-prefix": *webExternalPrefix,
"web.prefix-header": *webPrefixHeaderName,
}

label := cmd.Flag("bucket-web-label", "Prometheus label to use as timeline title in the bucket web UI").String()

m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
Expand All @@ -166,6 +178,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
*dedupReplicaLabels,
selectorRelabelConf,
*waitInterval,
*label,
flagsMap,
)
}
}
Expand Down Expand Up @@ -193,6 +207,8 @@ func runCompact(
dedupReplicaLabels []string,
selectorRelabelConf *extflag.PathOrContent,
waitInterval time.Duration,
label string,
flagsMap map[string]string,
) error {
halted := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand Down Expand Up @@ -442,6 +458,19 @@ func runCompact(
cancel()
})

if wait {
router := route.New()
bucketUI := ui.NewBucketUI(logger, label, flagsMap)
bucketUI.Register(router, extpromhttp.NewInstrumentationMiddleware(reg))
srv.Handle("/", router)

g.Add(func() error {
return refresh(ctx, logger, bucketUI, waitInterval, time.Minute, metaFetcher)
}, func(error) {
cancel()
})
}

level.Info(logger).Log("msg", "starting compact node")
statusProber.Ready()
return nil
Expand Down
Loading

0 comments on commit 4bd19b1

Please sign in to comment.