Skip to content

Commit

Permalink
Meta-monitoring based active series limiting (#5520)
Browse files Browse the repository at this point in the history
* Add initial PoC for meta-monitoring Receive active series limits

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Add e2e tests, rebase

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Add multitenant test + remake diagrams

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Implement suggestions; Make naming consistent; Rm/Add metrics

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Reuse meta-monitoring client

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Fix panic

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Cache meta-monitoring query result

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Fix lint

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Fail fast when limiting

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Implement suggestions: docs + mutex + struct

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Add interface and no-op

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Add changelog entry

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Add seriesLimitSupported to handler

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Remove tools fork

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>

* Change docs header

Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>
  • Loading branch information
saswatamcode authored Aug 2, 2022
1 parent 0febf14 commit dea54a7
Show file tree
Hide file tree
Showing 9 changed files with 521 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5470](https://github.com/thanos-io/thanos/pull/5470) Receive: Implement exposing TSDB stats for all tenants
- [#5493](https://github.com/thanos-io/thanos/pull/5493) Compact: Added `--compact.blocks-fetch-concurrency` allowing to configure number of go routines for download blocks during compactions.
- [#5527](https://github.com/thanos-io/thanos/pull/5527) Receive: Add per request limits for remote write.
- [#5520](https://github.com/thanos-io/thanos/pull/5520) Receive: Meta-monitoring based active series limiting

### Changed

Expand Down
39 changes: 39 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
"context"
"io/ioutil"
"net/url"
"os"
"path"
"strings"
Expand Down Expand Up @@ -185,6 +186,9 @@ func runReceive(
return errors.Wrap(err, "parse relabel configuration")
}

// Impose active series limit only if Receiver is in Router or RouterIngestor mode, and config is provided.
seriesLimitSupported := (receiveMode == receive.RouterOnly || receiveMode == receive.RouterIngestor) && conf.maxPerTenantLimit != 0

dbs := receive.NewMultiTSDB(
conf.dataDir,
logger,
Expand Down Expand Up @@ -214,6 +218,11 @@ func runReceive(
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
TSDBStats: dbs,
SeriesLimitSupported: seriesLimitSupported,
MaxPerTenantLimit: conf.maxPerTenantLimit,
MetaMonitoringUrl: conf.metaMonitoringUrl,
MetaMonitoringHttpClient: conf.metaMonitoringHttpClient,
MetaMonitoringLimitQuery: conf.metaMonitoringLimitQuery,
WriteSeriesLimit: conf.writeSeriesLimit,
WriteSamplesLimit: conf.writeSamplesLimit,
WriteRequestSizeLimit: conf.writeRequestSizeLimit,
Expand Down Expand Up @@ -297,6 +306,23 @@ func runReceive(
)
}

if seriesLimitSupported {
level.Info(logger).Log("msg", "setting up periodic (every 15s) meta-monitoring query for limiting cache")
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(15*time.Second, ctx.Done(), func() error {
if err := webHandler.ActiveSeriesLimit.QueryMetaMonitoring(ctx, log.With(logger, "component", "receive-meta-monitoring")); err != nil {
level.Error(logger).Log("msg", "failed to query meta-monitoring", "err", err.Error())
}
return nil
})
}, func(err error) {
cancel()
})
}
}

level.Debug(logger).Log("msg", "setting up periodic tenant pruning")
{
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -733,6 +759,11 @@ type receiveConfig struct {
rwClientServerCA string
rwClientServerName string

maxPerTenantLimit uint64
metaMonitoringLimitQuery string
metaMonitoringUrl *url.URL
metaMonitoringHttpClient *extflag.PathOrContent

dataDir string
labelStrs []string

Expand Down Expand Up @@ -831,6 +862,14 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64Var(&rc.replicationFactor)

cmd.Flag("receive.tenant-limits.max-head-series", "The total number of active (head) series that a tenant is allowed to have within a Receive topology. For more details refer: https://thanos.io/tip/components/receive.md/#limiting").Hidden().Uint64Var(&rc.maxPerTenantLimit)

cmd.Flag("receive.tenant-limits.meta-monitoring-url", "Meta-monitoring URL which is compatible with Prometheus Query API for active series limiting.").Hidden().URLVar(&rc.metaMonitoringUrl)

cmd.Flag("receive.tenant-limits.meta-monitoring-query", "PromQL Query to execute against meta-monitoring, to get the current number of active series for each tenant, across Receive replicas.").Default("sum(prometheus_tsdb_head_series) by (tenant)").Hidden().StringVar(&rc.metaMonitoringLimitQuery)

rc.metaMonitoringHttpClient = extflag.RegisterPathOrContent(cmd, "receive.tenant-limits.meta-monitoring-client", "YAML file or string with http client configs for meta-monitoring.", extflag.WithHidden())

rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden())

rc.relabelConfigPath = extflag.RegisterPathOrContent(cmd, "receive.relabel-config", "YAML file that contains relabeling configuration.", extflag.WithEnvSubstitution())
Expand Down
17 changes: 17 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,23 @@ The available request gates in Thanos Receive can be configured with the followi

By default all gates are disabled.

## Active Series Limiting (experimental)

Thanos Receive, in Router or RouterIngestor mode, supports limiting tenant active (head) series to maintain the system's stability. It uses any Prometheus Query API compatible meta-monitoring solution that consumes the metrics exposed by all receivers in the Thanos system. Such query endpoint allows getting the scrape time seconds old number of all active series per tenant, which is then compared with a configured limit before ingesting any tenant's remote write request. In case a tenant has gone above the limit, their remote write requests fail fully.

Every Receive Router/RouterIngestor node, queries meta-monitoring for active series of all tenants, every 15 seconds, and caches the results in a map. This cached result is used to limit all incoming remote write requests.

To use the feature, one should specify the following (hidden) flags:
- `--receive.tenant-limits.max-head-series`: Specifies the total number of active (head) series for any tenant, across all replicas (including data replication), allowed by Thanos Receive.
- `--receive.tenant-limits.meta-monitoring-url`: Specifies Prometheus Query API compatible meta-monitoring endpoint.
- `--receive.tenant-limits.meta-monitoring-query`: Optional flag to specify PromQL query to execute against meta-monitoring.
- `--receive.tenant-limits.meta-monitoring-client`: Optional YAML file/string specifying HTTP client config for meta-monitoring.

NOTE:
- It is possible that Receive ingests more active series than the specified limit, as it relies on meta-monitoring, which may not have the latest data for current number of active series of a tenant at all times.
- Thanos Receive performs best-effort limiting. In case meta-monitoring is down/unreachable, Thanos Receive will not impose limits and only log errors for meta-monitoring being unreachable. Similaly to when one receiver cannot be scraped.
- Support for different limit configuration for different tenants is planned for the future.

## Flags

```$ mdox-exec="thanos receive --help"
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ require (
github.com/chromedp/chromedp v0.8.2
github.com/davecgh/go-spew v1.1.1
github.com/dustin/go-humanize v1.0.0
github.com/efficientgo/e2e v0.12.1
github.com/efficientgo/e2e v0.12.2-0.20220714084440-2f5240d8c363
github.com/efficientgo/tools/core v0.0.0-20220225185207-fe763185946b
github.com/efficientgo/tools/extkingpin v0.0.0-20220225185207-fe763185946b
github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
github.com/fatih/structtag v1.2.0
github.com/felixge/fgprof v0.9.2
Expand Down
8 changes: 5 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,14 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ=
github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q=
github.com/efficientgo/e2e v0.12.1 h1:ZYNTf09ptlba0I3ZStYaF7gCbevWdalriiX7usOSiFM=
github.com/efficientgo/e2e v0.12.1/go.mod h1:xDHUyIqAWyVWU29Lf+BaZoavW7xAbDEvTwHWWI/3bhk=
github.com/efficientgo/e2e v0.12.2-0.20220714084440-2f5240d8c363 h1:Nw7SeBNMBrX3s0BbDlAWuGhEEDcKLteMsMmPThj4sxQ=
github.com/efficientgo/e2e v0.12.2-0.20220714084440-2f5240d8c363/go.mod h1:0Jrqcog5+GlJkbC8ulPkgyRZwq+GsvjUlNt+B2swzJ8=
github.com/efficientgo/tools/core v0.0.0-20210129205121-421d0828c9a6/go.mod h1:OmVcnJopJL8d3X3sSXTiypGoUSgFq1aDGmlrdi9dn/M=
github.com/efficientgo/tools/core v0.0.0-20220225185207-fe763185946b h1:ZHiD4/yE4idlbqvAO6iYCOYRzOMRpxkW+FKasRA3tsQ=
github.com/efficientgo/tools/core v0.0.0-20220225185207-fe763185946b/go.mod h1:OmVcnJopJL8d3X3sSXTiypGoUSgFq1aDGmlrdi9dn/M=
github.com/efficientgo/tools/extkingpin v0.0.0-20220225185207-fe763185946b h1:rFV4ZGoCKjhOyc4vjrzuCsi9BbrxMJvwmtceN0iR4Zc=
github.com/efficientgo/tools/extkingpin v0.0.0-20220225185207-fe763185946b/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q=
github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d h1:WZV/mrUyKS9w9r+Jdw+zq/tdGAb5LwB+H37EkMLhEMA=
github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q=
github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
github.com/elastic/go-sysinfo v1.8.1 h1:4Yhj+HdV6WjbCRgGdZpPJ8lZQlXZLKDAeIkmQ/VRvi4=
github.com/elastic/go-sysinfo v1.8.1/go.mod h1:JfllUnzoQV/JRYymbH3dO1yggI3mV2oTKSXsDHM+uIM=
Expand Down Expand Up @@ -1011,6 +1012,7 @@ github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+
github.com/prometheus/common v0.30.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.35.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA=
github.com/prometheus/common v0.36.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA=
github.com/prometheus/common v0.37.0 h1:ccBbHCgIiT9uSoFY0vX8H3zsNR5eLt17/RQLUvn8pXE=
github.com/prometheus/common v0.37.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA=
github.com/prometheus/common/assets v0.2.0/go.mod h1:D17UVUE12bHbim7HzwUvtqm6gwBEaDQ0F+hIGbFbccI=
Expand Down
179 changes: 173 additions & 6 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@ import (
stdlog "log"
"net"
"net/http"
"net/url"
"sort"
"strconv"
"sync"
"time"

extflag "github.com/efficientgo/tools/extkingpin"
"github.com/thanos-io/thanos/pkg/api"
statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/httpconfig"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/promclient"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -101,12 +105,23 @@ type Options struct {
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
TSDBStats TSDBStats
SeriesLimitSupported bool
MaxPerTenantLimit uint64
MetaMonitoringUrl *url.URL
MetaMonitoringHttpClient *extflag.PathOrContent
MetaMonitoringLimitQuery string
WriteSeriesLimit int64
WriteSamplesLimit int64
WriteRequestSizeLimit int64
WriteRequestConcurrencyLimit int
}

// activeSeriesLimiter encompasses active series limiting logic.
type activeSeriesLimiter interface {
QueryMetaMonitoring(context.Context, log.Logger) error
isUnderLimit(string, log.Logger) (bool, error)
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
type Handler struct {
logger log.Logger
Expand All @@ -115,12 +130,13 @@ type Handler struct {
options *Options
listener net.Listener

mtx sync.RWMutex
hashring Hashring
peers *peerGroup
expBackoff backoff.Backoff
peerStates map[string]*retryState
receiverMode ReceiverMode
mtx sync.RWMutex
hashring Hashring
peers *peerGroup
expBackoff backoff.Backoff
peerStates map[string]*retryState
receiverMode ReceiverMode
ActiveSeriesLimit activeSeriesLimiter

forwardRequests *prometheus.CounterVec
replications *prometheus.CounterVec
Expand Down Expand Up @@ -219,6 +235,11 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
h.replicationFactor.Set(1)
}

h.ActiveSeriesLimit = NewNopSeriesLimit()
if h.options.SeriesLimitSupported {
h.ActiveSeriesLimit = NewActiveSeriesLimit(h.options, registerer, h.receiverMode, logger)
}

ins := extpromhttp.NewNopInstrumentationMiddleware()
if o.Registry != nil {
ins = extpromhttp.NewTenantInstrumentationMiddleware(
Expand Down Expand Up @@ -431,6 +452,17 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {

defer h.writeGate.Done()

under, err := h.ActiveSeriesLimit.isUnderLimit(tenant, tLogger)
if err != nil {
level.Error(tLogger).Log("msg", "error while limiting", "err", err.Error())
}

// Fail request fully if tenant has exceeded set limit.
if !under {
http.Error(w, "tenant is above active series limit", http.StatusTooManyRequests)
return
}

// ioutil.ReadAll dynamically adjust the byte slice for read data, starting from 512B.
// Since this is receive hot path, grow upfront saving allocations and CPU time.
compressed := bytes.Buffer{}
Expand Down Expand Up @@ -534,6 +566,141 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(totalSamples))
}

// activeSeriesLimit implements activeSeriesLimiter interface.
type activeSeriesLimit struct {
mtx sync.RWMutex
limit uint64
tenantCurrentSeriesMap map[string]float64

metaMonitoringURL *url.URL
metaMonitoringClient *http.Client
metaMonitoringQuery string

configuredTenantLimit prometheus.Gauge
limitedRequests *prometheus.CounterVec
metaMonitoringErr prometheus.Counter
}

func NewActiveSeriesLimit(o *Options, registerer prometheus.Registerer, r ReceiverMode, logger log.Logger) *activeSeriesLimit {
limit := &activeSeriesLimit{
limit: o.MaxPerTenantLimit,
metaMonitoringURL: o.MetaMonitoringUrl,
metaMonitoringQuery: o.MetaMonitoringLimitQuery,
configuredTenantLimit: promauto.With(registerer).NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_tenant_head_series_limit",
Help: "The configured limit for active (head) series of tenants.",
},
),
limitedRequests: promauto.With(registerer).NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_receive_head_series_limited_requests_total",
Help: "The total number of remote write requests that have been dropped due to active series limiting.",
}, []string{"tenant"},
),
metaMonitoringErr: promauto.With(registerer).NewCounter(
prometheus.CounterOpts{
Name: "thanos_receive_metamonitoring_failed_queries_total",
Help: "The total number of meta-monitoring queries that failed while limiting.",
},
),
}

limit.configuredTenantLimit.Set(float64(o.MaxPerTenantLimit))
limit.tenantCurrentSeriesMap = map[string]float64{}

// Use specified HTTPConfig to make requests to meta-monitoring.
httpConfContentYaml, err := o.MetaMonitoringHttpClient.Content()
if err != nil {
level.Error(logger).Log("msg", "getting http client config", "err", err.Error())
}

httpClientConfig, err := httpconfig.NewClientConfigFromYAML(httpConfContentYaml)
if err != nil {
level.Error(logger).Log("msg", "parsing http config YAML", "err", err.Error())
}

limit.metaMonitoringClient, err = httpconfig.NewHTTPClient(*httpClientConfig, "meta-mon-for-limit")
if err != nil {
level.Error(logger).Log("msg", "improper http client config", "err", err.Error())
}

return limit
}

// QueryMetaMonitoring queries any Prometheus Query API compatible meta-monitoring
// solution with the configured query for getting current active (head) series of all tenants.
// It then populates tenantCurrentSeries map with result.
func (a *activeSeriesLimit) QueryMetaMonitoring(ctx context.Context, logger log.Logger) error {
c := promclient.NewWithTracingClient(logger, a.metaMonitoringClient, httpconfig.ThanosUserAgent)

vectorRes, _, err := c.QueryInstant(ctx, a.metaMonitoringURL, a.metaMonitoringQuery, time.Now(), promclient.QueryOptions{})
if err != nil {
a.metaMonitoringErr.Inc()
return err
}

level.Debug(logger).Log("msg", "successfully queried meta-monitoring", "vectors", len(vectorRes))

a.mtx.Lock()
defer a.mtx.Unlock()
// Construct map of tenant name and current HEAD series.
for _, e := range vectorRes {
for k, v := range e.Metric {
if k == "tenant" {
a.tenantCurrentSeriesMap[string(v)] = float64(e.Value)
level.Debug(logger).Log("msg", "tenant value queried", "tenant", string(v), "value", e.Value)
}
}
}

return nil
}

// isUnderLimit ensures that the current number of active series for a tenant does not exceed given limit.
// It does so in a best-effort way, i.e, in case meta-monitoring is unreachable, it does not impose limits.
// TODO(saswatamcode): Add capability to configure different limits for different tenants.
func (a *activeSeriesLimit) isUnderLimit(tenant string, logger log.Logger) (bool, error) {
a.mtx.RLock()
defer a.mtx.RUnlock()
if a.limit == 0 || a.metaMonitoringURL.Host == "" {
return true, nil
}

// In such limiting flow, we ingest the first remote write request
// and then check meta-monitoring metric to ascertain current active
// series. As such metric is updated in intervals, it is possible
// that Receive ingests more series than the limit, before detecting that
// a tenant has exceeded the set limits.
v, ok := a.tenantCurrentSeriesMap[tenant]
if !ok {
return true, errors.New("tenant not in current series map")
}

if v >= float64(a.limit) {
level.Error(logger).Log("msg", "tenant above limit", "currentSeries", v, "limit", a.limit)
a.limitedRequests.WithLabelValues(tenant).Inc()
return false, nil
}

return true, nil
}

// nopSeriesLimit implements activeSeriesLimiter interface as no-op.
type nopSeriesLimit struct{}

func NewNopSeriesLimit() *nopSeriesLimit {
return &nopSeriesLimit{}
}

func (a *nopSeriesLimit) QueryMetaMonitoring(_ context.Context, _ log.Logger) error {
return nil
}

func (a *nopSeriesLimit) isUnderLimit(_ string, _ log.Logger) (bool, error) {
return true, nil
}

// forward accepts a write request, batches its time series by
// corresponding endpoint, and forwards them in parallel to the
// correct endpoint. Requests destined for the local node are written
Expand Down
Loading

0 comments on commit dea54a7

Please sign in to comment.