Skip to content

Commit

Permalink
feat: Added /-/healthy endpoint to Rule and the generic metrics HTTP …
Browse files Browse the repository at this point in the history
…server.

Signed-off-by: Martin Chodur <m.chodur@seznam.cz>
  • Loading branch information
FUSAKLA committed Jan 13, 2019
1 parent b6dcbf5 commit 32d55d7
Show file tree
Hide file tree
Showing 13 changed files with 483 additions and 69 deletions.
14 changes: 9 additions & 5 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ func runCompact(
return errors.Wrap(err, "create compactor")
}

readinessProber, err := metricHTTPListenGroup(g, logger, reg, httpBindAddr, component)
if err != nil {
readinessProber.SetNotHealthy(err)
return err
}

var (
compactDir = path.Join(dataDir, "compact")
downsamplingDir = path.Join(dataDir, "downsample")
Expand Down Expand Up @@ -270,14 +276,12 @@ func runCompact(

return errors.Wrap(err, "error executing compaction")
})
}, func(error) {
}, func(err error) {
readinessProber.SetNotReady(err)
cancel()
})

if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil {
return err
}

level.Info(logger).Log("msg", "starting compact node")
readinessProber.SetReady()
return nil
}
16 changes: 10 additions & 6 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import (
"github.com/go-kit/kit/log/level"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/grpc-ecosystem/go-grpc-prometheus"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/improbable-eng/thanos/pkg/prober"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/tracing"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -294,21 +295,24 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o
}

// metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics.
func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error {
func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string, component string) (*prober.Prober, error) {
mux := http.NewServeMux()
registerMetrics(mux, reg)
registerProfile(mux)
readinessProber := prober.NewProbeInMux(component, mux)

l, err := net.Listen("tcp", httpBindAddr)
if err != nil {
return errors.Wrap(err, "listen metrics address")
return nil, errors.Wrap(err, "listen metrics address")
}

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for metrics", "address", httpBindAddr)
readinessProber.SetHealthy()
return errors.Wrap(http.Serve(l, mux), "serve metrics")
}, func(error) {
}, func(err error) {
readinessProber.SetNotHealthy(err)
runutil.CloseWithLogOnErr(logger, l, "metric listener")
})
return nil
return readinessProber, nil
}
69 changes: 69 additions & 0 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"context"
"fmt"
"net/http"
"os"
"path"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
)

func queryHTTPGetEndpoint(ctx context.Context, t *testing.T, logger log.Logger, url string) (*http.Response, error) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s", url), nil)
testutil.Ok(t, err)
return http.DefaultClient.Do(req.WithContext(ctx))
}

func TestGenericHttpEndpoints(t *testing.T) {
var g run.Group
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))
metricsRegistry := prometheus.NewRegistry()
component := "sidecar"
ctx := context.Background()

freePort, err := testutil.FreePort()
testutil.Ok(t, err)

serverAddress := fmt.Sprintf("127.0.0.1:%d", freePort)

readinessProber, err := metricHTTPListenGroup(&g, logger, metricsRegistry, serverAddress, component)
testutil.Ok(t, err)
go func() { _ = g.Run() }()

testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/-/healthy"))
testutil.Ok(t, err)
testutil.Equals(t, 200, resp.StatusCode)
return err
}))

readinessProber.SetReady()
testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/-/ready"))
testutil.Ok(t, err)
testutil.Equals(t, 200, resp.StatusCode)
return err
}))

testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/metrics"))
testutil.Ok(t, err)
testutil.Equals(t, 200, resp.StatusCode)
return err
}))

testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
resp, err := queryHTTPGetEndpoint(ctx, t, log.NewNopLogger(), path.Join(serverAddress, "/debug/pprof/"))
testutil.Ok(t, err)
testutil.Equals(t, 200, resp.StatusCode)
return err
}))
}
30 changes: 17 additions & 13 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,24 @@ import (
"time"

"github.com/improbable-eng/thanos/pkg/extprom"
"github.com/improbable-eng/thanos/pkg/prober"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/go-grpc-prometheus"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/discovery/cache"
"github.com/improbable-eng/thanos/pkg/discovery/dns"
"github.com/improbable-eng/thanos/pkg/query"
"github.com/improbable-eng/thanos/pkg/query/api"
v1 "github.com/improbable-eng/thanos/pkg/query/api"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/store"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/improbable-eng/thanos/pkg/tracing"
"github.com/improbable-eng/thanos/pkg/ui"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
Expand All @@ -38,7 +39,7 @@ import (
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"gopkg.in/alecthomas/kingpin.v2"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

// registerQuery registers a query command.
Expand Down Expand Up @@ -128,6 +129,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
*key,
*caCert,
*serverName,
name,
*httpBindAddr,
*maxConcurrentQueries,
time.Duration(*queryTimeout),
Expand Down Expand Up @@ -240,6 +242,7 @@ func runQuery(
key string,
caCert string,
serverName string,
component string,
httpBindAddr string,
maxConcurrentQueries int,
queryTimeout time.Duration,
Expand All @@ -252,6 +255,7 @@ func runQuery(
fileSD *file.Discovery,
dnsSDInterval time.Duration,
) error {

duplicatedStores := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_query_duplicated_store_address",
Help: "The number of times a duplicated store addresses is detected from the different configs in query",
Expand Down Expand Up @@ -372,6 +376,7 @@ func runQuery(
cancel()
})
}
var readinessProber *prober.Prober
// Start query API + UI HTTP server.
{
router := route.New()
Expand All @@ -380,12 +385,7 @@ func runQuery(
api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse)
api.Register(router.WithPrefix("/api/v1"), tracer, logger)

router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if _, err := fmt.Fprintf(w, "Thanos Querier is Healthy.\n"); err != nil {
level.Error(logger).Log("msg", "Could not write health check response.")
}
})
readinessProber = prober.NewProbeInRouter(component, router)

mux := http.NewServeMux()
registerMetrics(mux, reg)
Expand All @@ -399,8 +399,10 @@ func runQuery(

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for query and metrics", "address", httpBindAddr)
readinessProber.SetHealthy()
return errors.Wrap(http.Serve(l, mux), "serve query")
}, func(error) {
}, func(err error) {
readinessProber.SetNotHealthy(err)
runutil.CloseWithLogOnErr(logger, l, "query and metric listener")
})
}
Expand All @@ -422,9 +424,11 @@ func runQuery(

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
readinessProber.SetReady()
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
}, func(err error) {
s.Stop()
readinessProber.SetNotReady(err)
runutil.CloseWithLogOnErr(logger, l, "store gRPC listener")
})
}
Expand Down
62 changes: 36 additions & 26 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"time"

"github.com/improbable-eng/thanos/pkg/extprom"
"github.com/improbable-eng/thanos/pkg/prober"
opentracing "github.com/opentracing/opentracing-go"
kingpin "gopkg.in/alecthomas/kingpin.v2"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand All @@ -36,7 +39,6 @@ import (
"github.com/improbable-eng/thanos/pkg/tracing"
"github.com/improbable-eng/thanos/pkg/ui"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand All @@ -50,7 +52,6 @@ import (
"github.com/prometheus/prometheus/util/strutil"
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
)

// registerRule registers a rule command.
Expand Down Expand Up @@ -502,37 +503,17 @@ func runRule(
cancel()
})
}
// Start gRPC server.
{
l, err := net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}
logger := log.With(logger, "component", "store")

store := store.NewTSDBStore(logger, reg, db, lset)

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC options")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, store)

g.Add(func() error {
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
s.Stop()
runutil.CloseWithLogOnErr(logger, l, "store gRPC listener")
})
}
var readinessProber *prober.Prober
// Start UI & metrics HTTP server.
{
router := route.New()
router.Post("/-/reload", func(w http.ResponseWriter, r *http.Request) {
reload <- struct{}{}
})

readinessProber = prober.NewProbeInRouter(component, router)

ui.NewRuleUI(logger, mgr, alertQueryURL.String()).Register(router)

mux := http.NewServeMux()
Expand All @@ -547,12 +528,41 @@ func runRule(

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for ui requests", "address", httpBindAddr)
readinessProber.SetHealthy()
return errors.Wrap(http.Serve(l, mux), "serve query")
}, func(error) {
}, func(err error) {
readinessProber.SetNotHealthy(err)
runutil.CloseWithLogOnErr(logger, l, "query and metric listener")
})
}

// Start gRPC server.
{
l, err := net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}
logger := log.With(logger, "component", "store")

store := store.NewTSDBStore(logger, reg, db, lset)

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC options")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, store)

g.Add(func() error {
readinessProber.SetReady()
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(err error) {
s.Stop()
readinessProber.SetNotReady(err)
runutil.CloseWithLogOnErr(logger, l, "store gRPC listener")
})
}

var uploads = true

bucketConfig, err := objStoreConfig.Content()
Expand Down
Loading

0 comments on commit 32d55d7

Please sign in to comment.