Skip to content

Commit

Permalink
cmd/thanos/compact: add bucket UI
Browse files Browse the repository at this point in the history
This commit enhances the compact component so that it runs the bucket UI
whenever the --wait flag is also passed. In order to reduce the overhead
of running the UI in addition to the compactor, this commit also
refactors the compactor and bucket commands a bit in order to re-use a
single meta fetcher.

Signed-off-by: Lucas Servén Marín <lserven@gmail.com>
  • Loading branch information
squat committed Mar 24, 2020
1 parent d89a497 commit d2e5ed3
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
### Added

- [#2265](https://github.com/thanos-io/thanos/pull/2265) Compactor: Add `--wait-interval` to specify compaction wait interval between consecutive compact runs when `--wait` enabled.
- [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process.

### Changed

Expand Down
47 changes: 22 additions & 25 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,6 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
label := cmd.Flag("label", "Prometheus label to use as timeline title").String()

m[name+" web"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
ctx, cancel := context.WithCancel(context.Background())

comp := component.Bucket
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
Expand Down Expand Up @@ -365,10 +363,26 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
level.Warn(logger).Log("msg", "Refresh interval should be at least 2 times the timeout")
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Bucket.String())
if err != nil {
return errors.Wrap(err, "bucket client")
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return err
}

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
statusProber.Ready()

return refresh(ctx, logger, bucketUI, *interval, *timeout, name, reg, objStoreConfig)
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
return refresh(ctx, logger, bucketUI, *interval, *timeout, fetcher)
}, func(error) {
cancel()
})
Expand Down Expand Up @@ -432,7 +446,6 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na
}

func registerBucketDownsample(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *extflag.PathOrContent) {

comp := component.Downsample
cmd := root.Command(comp.String(), "continuously downsamples blocks in an object store bucket")

Expand All @@ -446,30 +459,14 @@ func registerBucketDownsample(m map[string]setupFunc, root *kingpin.CmdClause, n
}
}

// refresh metadata from remote storage periodically and update UI.
func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, name string, reg *prometheus.Registry, objStoreConfig *extflag.PathOrContent) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, name)
if err != nil {
return errors.Wrap(err, "bucket client")
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return err
}

defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
// refresh metadata from remote storage periodically and update the UI.
func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, fetcher *block.MetaFetcher) error {
return runutil.Repeat(duration, ctx.Done(), func() error {
return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(ctx, timeout)
defer iterCancel()

blocks, err := download(iterCtx, logger, bkt, fetcher)
blocks, err := download(iterCtx, logger, fetcher)
if err != nil {
bucketUI.Set("[]", err)
return err
Expand All @@ -486,7 +483,7 @@ func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, durati
})
}

func download(ctx context.Context, logger log.Logger, bkt objstore.Bucket, fetcher *block.MetaFetcher) ([]metadata.Meta, error) {
func download(ctx context.Context, logger log.Logger, fetcher *block.MetaFetcher) ([]metadata.Meta, error) {
level.Info(logger).Log("msg", "synchronizing block metadata")

metas, _, err := fetcher.Fetch(ctx)
Expand Down
31 changes: 30 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
Expand All @@ -29,11 +30,13 @@ import (
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/ui"
"gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -107,7 +110,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
wait := cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work.").
Short('w').Bool()

waitInterval := cmd.Flag("wait-interval", "Wait interval between consecutive compaction runs. Only works when --wait flag specified.").
waitInterval := cmd.Flag("wait-interval", "Wait interval between consecutive compaction runs and bucket refreshes. Only works when --wait flag specified.").
Default("5m").Duration()

generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload.").
Expand Down Expand Up @@ -135,6 +138,15 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {

selectorRelabelConf := regSelectorRelabelFlags(cmd)

webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String()
webPrefixHeaderName := cmd.Flag("web.prefix-header", "Name of HTTP request header used for dynamic prefixing of UI links and redirects. This option is ignored if web.external-prefix argument is set. Security risk: enable this option only if a reverse proxy in front of thanos is resetting the header. The --web.prefix-header=X-Forwarded-Prefix option can be useful, for example, if Thanos UI is served via Traefik reverse proxy with PathPrefixStrip option enabled, which sends the stripped prefix value in X-Forwarded-Prefix header. This allows thanos UI to be served on a sub-path.").Default("").String()
flagsMap := map[string]string{
"web.external-prefix": *webExternalPrefix,
"web.prefix-header": *webPrefixHeaderName,
}

label := cmd.Flag("bucket-web-label", "Prometheus label to use as timeline title in the bucket web UI").String()

m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
Expand All @@ -159,6 +171,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
*compactionConcurrency,
selectorRelabelConf,
*waitInterval,
*label,
flagsMap,
)
}
}
Expand All @@ -185,6 +199,8 @@ func runCompact(
concurrency int,
selectorRelabelConf *extflag.PathOrContent,
waitInterval time.Duration,
label string,
flagsMap map[string]string,
) error {
halted := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand Down Expand Up @@ -425,6 +441,19 @@ func runCompact(
cancel()
})

if wait {
router := route.New()
bucketUI := ui.NewBucketUI(logger, label, flagsMap)
bucketUI.Register(router, extpromhttp.NewInstrumentationMiddleware(reg))
srv.Handle("/", router)

g.Add(func() error {
return refresh(ctx, logger, bucketUI, waitInterval, time.Minute, metaFetcher)
}, func(error) {
cancel()
})
}

level.Info(logger).Log("msg", "starting compact node")
statusProber.Ready()
return nil
Expand Down
7 changes: 7 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,17 @@ Flags:
selecting blocks. It follows native Prometheus
relabel-config syntax. See format details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
<<<<<<< HEAD
--delete-delay=48h Time before a block marked for deletion is deleted from bucket.
If delete-delay is non zero, blocks will be marked for deletion and compactor component will delete blocks marked for deletion from the bucket.
If delete-delay is 0, blocks will be deleted straight away.
Use this if you want to get rid of or move the block immediately.
Note that deleting blocks immediately can cause query failures, if store gateway still has the block
loaded, or compactor is ignoring the deletion because it's compacting the block at the same time.
=======
--bucket-web-label=BUCKET-WEB-LABEL
Prometheus label to use as timeline title in the
bucket web UI
>>>>>>> 68b4effb... cmd/thanos/compact: add bucket UI
```

0 comments on commit d2e5ed3

Please sign in to comment.