Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/thanos/compact: add bucket UI #1714

Merged
merged 1 commit into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#2265](https://github.com/thanos-io/thanos/pull/2265) Compactor: Add `--wait-interval` to specify compaction wait interval between consecutive compact runs when `--wait` enabled.
- [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks.
- [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process.

### Changed

Expand Down
47 changes: 22 additions & 25 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,6 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
label := cmd.Flag("label", "Prometheus label to use as timeline title").String()

m[name+" web"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
ctx, cancel := context.WithCancel(context.Background())

comp := component.Bucket
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
Expand Down Expand Up @@ -365,10 +363,26 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
level.Warn(logger).Log("msg", "Refresh interval should be at least 2 times the timeout")
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Bucket.String())
if err != nil {
return errors.Wrap(err, "bucket client")
}

squat marked this conversation as resolved.
Show resolved Hide resolved
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
statusProber.Ready()

return refresh(ctx, logger, bucketUI, *interval, *timeout, name, reg, objStoreConfig)
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
return refresh(ctx, logger, bucketUI, *interval, *timeout, fetcher)
}, func(error) {
cancel()
})
Expand Down Expand Up @@ -432,7 +446,6 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na
}

func registerBucketDownsample(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *extflag.PathOrContent) {

comp := component.Downsample
cmd := root.Command(comp.String(), "continuously downsamples blocks in an object store bucket")

Expand All @@ -446,30 +459,14 @@ func registerBucketDownsample(m map[string]setupFunc, root *kingpin.CmdClause, n
}
}

// refresh metadata from remote storage periodically and update UI.
func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, name string, reg *prometheus.Registry, objStoreConfig *extflag.PathOrContent) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, name)
if err != nil {
return errors.Wrap(err, "bucket client")
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}

defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
// refresh metadata from remote storage periodically and update the UI.
func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, fetcher *block.MetaFetcher) error {
return runutil.Repeat(duration, ctx.Done(), func() error {
return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(ctx, timeout)
defer iterCancel()

blocks, err := download(iterCtx, logger, bkt, fetcher)
blocks, err := download(iterCtx, logger, fetcher)
if err != nil {
bucketUI.Set("[]", err)
return err
Expand All @@ -486,7 +483,7 @@ func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, durati
})
}

func download(ctx context.Context, logger log.Logger, bkt objstore.Bucket, fetcher *block.MetaFetcher) ([]metadata.Meta, error) {
func download(ctx context.Context, logger log.Logger, fetcher *block.MetaFetcher) ([]metadata.Meta, error) {
level.Info(logger).Log("msg", "synchronizing block metadata")

metas, _, err := fetcher.Fetch(ctx)
Expand Down
31 changes: 30 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
Expand All @@ -29,11 +30,13 @@ import (
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/ui"
"gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -107,7 +110,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
wait := cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work.").
Short('w').Bool()

waitInterval := cmd.Flag("wait-interval", "Wait interval between consecutive compaction runs. Only works when --wait flag specified.").
waitInterval := cmd.Flag("wait-interval", "Wait interval between consecutive compaction runs and bucket refreshes. Only works when --wait flag specified.").
Default("5m").Duration()

generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload.").
Expand Down Expand Up @@ -141,6 +144,15 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {

selectorRelabelConf := regSelectorRelabelFlags(cmd)

webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String()
webPrefixHeaderName := cmd.Flag("web.prefix-header", "Name of HTTP request header used for dynamic prefixing of UI links and redirects. This option is ignored if web.external-prefix argument is set. Security risk: enable this option only if a reverse proxy in front of thanos is resetting the header. The --web.prefix-header=X-Forwarded-Prefix option can be useful, for example, if Thanos UI is served via Traefik reverse proxy with PathPrefixStrip option enabled, which sends the stripped prefix value in X-Forwarded-Prefix header. This allows thanos UI to be served on a sub-path.").Default("").String()
flagsMap := map[string]string{
"web.external-prefix": *webExternalPrefix,
"web.prefix-header": *webPrefixHeaderName,
}

label := cmd.Flag("bucket-web-label", "Prometheus label to use as timeline title in the bucket web UI").String()

m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
Expand All @@ -166,6 +178,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
*dedupReplicaLabels,
selectorRelabelConf,
*waitInterval,
*label,
flagsMap,
)
}
}
Expand Down Expand Up @@ -193,6 +207,8 @@ func runCompact(
dedupReplicaLabels []string,
selectorRelabelConf *extflag.PathOrContent,
waitInterval time.Duration,
label string,
flagsMap map[string]string,
) error {
halted := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand Down Expand Up @@ -442,6 +458,19 @@ func runCompact(
cancel()
})

if wait {
router := route.New()
bucketUI := ui.NewBucketUI(logger, label, flagsMap)
bucketUI.Register(router, extpromhttp.NewInstrumentationMiddleware(reg))
srv.Handle("/", router)

g.Add(func() error {
return refresh(ctx, logger, bucketUI, waitInterval, time.Minute, metaFetcher)
}, func(error) {
cancel()
})
}

level.Info(logger).Log("msg", "starting compact node")
statusProber.Ready()
return nil
Expand Down
Loading