diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 785ff49fbf2..b40f771c413 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -299,12 +299,12 @@ func runReceive( } if (receiveMode == receive.RouterOnly || receiveMode == receive.RouterIngestor) && conf.maxPerTenantLimit != 0 { - level.Debug(logger).Log("msg", "setting up periodic meta-monitoring query for limiting cache") + level.Info(logger).Log("msg", "setting up periodic (every 15s) meta-monitoring query for limiting cache") { ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { return runutil.Repeat(15*time.Second, ctx.Done(), func() error { - if err := webHandler.QueryMetaMonitoring(ctx); err != nil { + if err := webHandler.ActiveSeriesLimit.QueryMetaMonitoring(ctx, log.With(logger, "component", "receive-meta-monitoring")); err != nil { level.Error(logger).Log("msg", "failed to query meta-monitoring", "err", err.Error()) } return nil @@ -848,16 +848,17 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64Var(&rc.replicationFactor) - cmd.Flag("receive.tenant-limits.max-head-series", "The total number of active or HEAD series that a tenant is allowed to have within a Receive topology.").Uint64Var(&rc.maxPerTenantLimit) + cmd.Flag("receive.tenant-limits.max-head-series", "The total number of active (head) series that a tenant is allowed to have within a Receive topology. For more details refer: https://thanos.io/tip/components/receive.md/#limiting").Hidden().Uint64Var(&rc.maxPerTenantLimit) - cmd.Flag("receive.tenant-limits.meta-monitoring-url", "Meta-monitoring URL which is compatible with Prometheus Query API for active series limiting.").Default("http://localhost:9090").URLVar(&rc.metaMonitoringUrl) + cmd.Flag("receive.tenant-limits.meta-monitoring-url", "Meta-monitoring URL which is compatible with Prometheus Query API for active series limiting.").Hidden().URLVar(&rc.metaMonitoringUrl) - cmd.Flag("receive.tenant-limits.meta-monitoring-query", "PromQL Query to execute against meta-monitoring, to get the current number of active series for each tenant, across Receive replicas.").Default("sum(prometheus_tsdb_head_series) by (tenant)").StringVar(&rc.metaMonitoringLimitQuery) + cmd.Flag("receive.tenant-limits.meta-monitoring-query", "PromQL Query to execute against meta-monitoring, to get the current number of active series for each tenant, across Receive replicas.").Default("sum(prometheus_tsdb_head_series) by (tenant)").Hidden().StringVar(&rc.metaMonitoringLimitQuery) rc.metaMonitoringHttpClient = extflag.RegisterPathOrContent( cmd, "receive.tenant-limits.meta-monitoring-client", "YAML file or string with http client configs for meta-monitoring.", + extflag.WithHidden(), ) rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden()) diff --git a/docs/components/receive.md b/docs/components/receive.md index 7f6aa824a89..89d2c112993 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -79,19 +79,20 @@ With such configuration any receive listens for remote write on `10908/api/v ## Limiting -Thanos Receive, in Router or RouterIngestor mode, supports limiting tenant active or HEAD series, to maintain stability of the system. It uses any Prometheus Query API compatible meta-monitoring solution to get the current number of active series, and compares that with a configured limit, before ingesting any tenant's remote write request. In case a tenant has gone above the limit, their remote write requests are failed fully. +Thanos Receive, in Router or RouterIngestor mode, supports limiting tenant active (head) series to maintain the system's stability. It uses any Prometheus Query API compatible meta-monitoring solution that consumes the metrics exposed by all receivers in the Thanos system. Such query endpoint allows getting the scrape time seconds old number of all active series per tenant, which is then compared with a configured limit before ingesting any tenant's remote write request. In case a tenant has gone above the limit, their remote write requests fail fully. -Meta-monitoring in this context refers to an external monitoring system scraping all Thanos Receive instances and exposing them in an API compatible with the Prometheus Query API. +Every Receive Router/RouterIngestor node, queries meta-monitoring for active series of all tenants, every 15 seconds, and caches the results in a map. This cached result is used to limit all incoming remote write requests. -To use the feature, one should specify the following flags: -- `--receive.tenant-limits.max-head-series`: Specifies the total number of active or HEAD series for any tenant, across all replicas (including data replication), allowed by Thanos Receive. +To use the feature, one should specify the following (hidden) flags: +- `--receive.tenant-limits.max-head-series`: Specifies the total number of active (head) series for any tenant, across all replicas (including data replication), allowed by Thanos Receive. - `--receive.tenant-limits.meta-monitoring-url`: Specifies Prometheus Query API compatible meta-monitoring endpoint. - `--receive.tenant-limits.meta-monitoring-query`: Optional flag to specify PromQL query to execute against meta-monitoring. - `--receive.tenant-limits.meta-monitoring-client`: Optional YAML file/string specifying HTTP client config for meta-monitoring. NOTE: - It is possible that Receive ingests more active series than the specified limit, as it relies on meta-monitoring, which may not have the latest data for current number of active series of a tenant at all times. -- Thanos Receive performs best-effort limiting. In case meta-monitoring is down/unreachable, Thanos Receive will not impose limits. +- Thanos Receive performs best-effort limiting. In case meta-monitoring is down/unreachable, Thanos Receive will not impose limits and only log errors for meta-monitoring being unreachable. Similaly to when one receiver cannot be scraped. +- Support for different limit configuration for different tenants is planned for the future. ## Flags @@ -199,28 +200,6 @@ Flags: --receive.tenant-label-name="tenant_id" Label name through which the tenant will be announced. - --receive.tenant-limits.max-head-series=RECEIVE.TENANT-LIMITS.MAX-HEAD-SERIES - The total number of active or HEAD series that - a tenant is allowed to have within a Receive - topology. - --receive.tenant-limits.meta-monitoring-client= - Alternative to - 'receive.tenant-limits.meta-monitoring-client-file' - flag (mutually exclusive). Content of YAML file - or string with http client configs for - meta-monitoring. - --receive.tenant-limits.meta-monitoring-client-file= - Path to YAML file or string with http client - configs for meta-monitoring. - --receive.tenant-limits.meta-monitoring-query="sum(prometheus_tsdb_head_series) by (tenant)" - PromQL Query to execute against - meta-monitoring, to get the current number of - active series for each tenant, across Receive - replicas. - --receive.tenant-limits.meta-monitoring-url=http://localhost:9090 - Meta-monitoring URL which is compatible with - Prometheus Query API for active series - limiting. --remote-write.address="0.0.0.0:19291" Address to listen on for remote write requests. --remote-write.client-server-name="" diff --git a/go.mod b/go.mod index dd4f8dfcaf5..c26c74b0dda 100644 --- a/go.mod +++ b/go.mod @@ -246,6 +246,9 @@ replace ( // Required by Cortex https://github.com/cortexproject/cortex/pull/3051. github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab + // TODO(saswatamcode): Remove when https://github.com/efficientgo/tools/pull/14 is merged. + github.com/efficientgo/tools/extkingpin => github.com/saswatamcode/tools/extkingpin v0.0.0-20220723122803-67d37ea96343 + github.com/vimeo/galaxycache => github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e // Override due to https://github.com/weaveworks/common/issues/239 diff --git a/go.sum b/go.sum index 47a0d21a5d5..466ac849111 100644 --- a/go.sum +++ b/go.sum @@ -302,8 +302,6 @@ github.com/efficientgo/e2e v0.12.2-0.20220714084440-2f5240d8c363/go.mod h1:0Jrqc github.com/efficientgo/tools/core v0.0.0-20210129205121-421d0828c9a6/go.mod h1:OmVcnJopJL8d3X3sSXTiypGoUSgFq1aDGmlrdi9dn/M= github.com/efficientgo/tools/core v0.0.0-20220225185207-fe763185946b h1:ZHiD4/yE4idlbqvAO6iYCOYRzOMRpxkW+FKasRA3tsQ= github.com/efficientgo/tools/core v0.0.0-20220225185207-fe763185946b/go.mod h1:OmVcnJopJL8d3X3sSXTiypGoUSgFq1aDGmlrdi9dn/M= -github.com/efficientgo/tools/extkingpin v0.0.0-20220225185207-fe763185946b h1:rFV4ZGoCKjhOyc4vjrzuCsi9BbrxMJvwmtceN0iR4Zc= -github.com/efficientgo/tools/extkingpin v0.0.0-20220225185207-fe763185946b/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q= github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= github.com/elastic/go-sysinfo v1.8.1 h1:4Yhj+HdV6WjbCRgGdZpPJ8lZQlXZLKDAeIkmQ/VRvi4= github.com/elastic/go-sysinfo v1.8.1/go.mod h1:JfllUnzoQV/JRYymbH3dO1yggI3mV2oTKSXsDHM+uIM= @@ -1054,6 +1052,8 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/santhosh-tekuri/jsonschema v1.2.4 h1:hNhW8e7t+H1vgY+1QeEQpveR6D4+OwKPXCfD2aieJis= github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHiuO9LYd+cIxzgEHCQI4= +github.com/saswatamcode/tools/extkingpin v0.0.0-20220723122803-67d37ea96343 h1:HWf7gTC91QRN18U/ioIjS0QBJmn5cC6KGrurFcOCygc= +github.com/saswatamcode/tools/extkingpin v0.0.0-20220723122803-67d37ea96343/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.9 h1:0roa6gXKgyta64uqh52AQG3wzZXH21unn+ltzQSXML0= diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index f6ba4d42d5a..be7a2e21ea4 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -109,6 +109,21 @@ type Options struct { MetaMonitoringLimitQuery string } +// activeSeriesLimit implements active series limiting for web Handler. +type activeSeriesLimit struct { + mtx sync.Mutex + limit uint64 + tenantCurrentSeriesMap map[string]float64 + + metaMonitoringURL *url.URL + metaMonitoringClient *http.Client + metaMonitoringQuery string + + configuredTenantLimit prometheus.Gauge + limitedRequests *prometheus.CounterVec + metaMonitoringErr prometheus.Counter +} + // Handler serves a Prometheus remote write receiving HTTP endpoint. type Handler struct { logger log.Logger @@ -117,21 +132,17 @@ type Handler struct { options *Options listener net.Listener - mtx sync.RWMutex - hashring Hashring - peers *peerGroup - expBackoff backoff.Backoff - peerStates map[string]*retryState - receiverMode ReceiverMode - metaMonitoringClient *http.Client - tenantCurrentSeriesMap map[string]float64 + mtx sync.RWMutex + hashring Hashring + peers *peerGroup + expBackoff backoff.Backoff + peerStates map[string]*retryState + receiverMode ReceiverMode + ActiveSeriesLimit *activeSeriesLimit - forwardRequests *prometheus.CounterVec - replications *prometheus.CounterVec - replicationFactor prometheus.Gauge - configuredTenantLimit prometheus.Gauge - limitedRequests *prometheus.CounterVec - metaMonitoringErr prometheus.Counter + forwardRequests *prometheus.CounterVec + replications *prometheus.CounterVec + replicationFactor prometheus.Gauge writeSamplesTotal *prometheus.HistogramVec writeTimeseriesTotal *prometheus.HistogramVec @@ -178,24 +189,29 @@ func NewHandler(logger log.Logger, o *Options) *Handler { Help: "The number of times to replicate incoming write requests.", }, ), - configuredTenantLimit: promauto.With(registerer).NewGauge( - prometheus.GaugeOpts{ - Name: "thanos_receive_tenant_head_series_limit", - Help: "The configured limit for active or HEAD series of tenants.", - }, - ), - limitedRequests: promauto.With(registerer).NewCounterVec( - prometheus.CounterOpts{ - Name: "thanos_receive_head_series_limited_requests_total", - Help: "The total number of remote write requests that have been dropped due to head series limiting.", - }, []string{"tenant"}, - ), - metaMonitoringErr: promauto.With(registerer).NewCounter( - prometheus.CounterOpts{ - Name: "thanos_receive_metamonitoring_failed_queries_total", - Help: "The total number of meta-monitoring queries that failed while limiting.", - }, - ), + ActiveSeriesLimit: &activeSeriesLimit{ + limit: o.MaxPerTenantLimit, + metaMonitoringURL: o.MetaMonitoringUrl, + metaMonitoringQuery: o.MetaMonitoringLimitQuery, + configuredTenantLimit: promauto.With(registerer).NewGauge( + prometheus.GaugeOpts{ + Name: "thanos_receive_tenant_head_series_limit", + Help: "The configured limit for active (head) series of tenants.", + }, + ), + limitedRequests: promauto.With(registerer).NewCounterVec( + prometheus.CounterOpts{ + Name: "thanos_receive_head_series_limited_requests_total", + Help: "The total number of remote write requests that have been dropped due to active series limiting.", + }, []string{"tenant"}, + ), + metaMonitoringErr: promauto.With(registerer).NewCounter( + prometheus.CounterOpts{ + Name: "thanos_receive_metamonitoring_failed_queries_total", + Help: "The total number of meta-monitoring queries that failed while limiting.", + }, + ), + }, writeTimeseriesTotal: promauto.With(registerer).NewHistogramVec( prometheus.HistogramOpts{ Namespace: "thanos", @@ -227,8 +243,8 @@ func NewHandler(logger log.Logger, o *Options) *Handler { h.replicationFactor.Set(1) } - h.configuredTenantLimit.Set(float64(o.MaxPerTenantLimit)) - h.tenantCurrentSeriesMap = map[string]float64{} + h.ActiveSeriesLimit.configuredTenantLimit.Set(float64(o.MaxPerTenantLimit)) + h.ActiveSeriesLimit.tenantCurrentSeriesMap = map[string]float64{} if (h.receiverMode == RouterOnly || h.receiverMode == RouterIngestor) && h.options.MaxPerTenantLimit != 0 { // Use specified HTTPConfig to make requests to meta-monitoring. @@ -242,7 +258,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler { level.Error(h.logger).Log("msg", "parsing http config YAML", "err", err.Error()) } - h.metaMonitoringClient, err = httpconfig.NewHTTPClient(*httpClientConfig, "thanos-receive") + h.ActiveSeriesLimit.metaMonitoringClient, err = httpconfig.NewHTTPClient(*httpClientConfig, "meta-mon-for-limit") if err != nil { level.Error(h.logger).Log("msg", "improper http client config", "err", err.Error()) } @@ -451,7 +467,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { // Impose limits only if Receive is in Router or RouterIngestor mode. if h.receiverMode == RouterOnly || h.receiverMode == RouterIngestor { - under, err := h.isUnderLimit(tenant, tLogger) + under, err := h.ActiveSeriesLimit.isUnderLimit(tenant, tLogger) if err != nil { level.Error(tLogger).Log("msg", "error while limiting", "err", err.Error()) } @@ -549,25 +565,27 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { } // QueryMetaMonitoring queries any Prometheus Query API compatible meta-monitoring -// solution with the configured query for getting current HEAD series of all tenants. +// solution with the configured query for getting current active (head) series of all tenants. // It then populates tenantCurrentSeries map with result. -func (h *Handler) QueryMetaMonitoring(ctx context.Context) error { - c := promclient.NewWithTracingClient(h.logger, h.metaMonitoringClient, httpconfig.ThanosUserAgent) +func (a *activeSeriesLimit) QueryMetaMonitoring(ctx context.Context, logger log.Logger) error { + a.mtx.Lock() + defer a.mtx.Unlock() + c := promclient.NewWithTracingClient(logger, a.metaMonitoringClient, httpconfig.ThanosUserAgent) - vectorRes, _, err := c.QueryInstant(ctx, h.options.MetaMonitoringUrl, h.options.MetaMonitoringLimitQuery, time.Now(), promclient.QueryOptions{}) + vectorRes, _, err := c.QueryInstant(ctx, a.metaMonitoringURL, a.metaMonitoringQuery, time.Now(), promclient.QueryOptions{}) if err != nil { - h.metaMonitoringErr.Inc() + a.metaMonitoringErr.Inc() return err } - level.Debug(h.logger).Log("msg", "successfully queried meta-monitoring", "vectors", len(vectorRes)) + level.Debug(logger).Log("msg", "successfully queried meta-monitoring", "vectors", len(vectorRes)) // Construct map of tenant name and current HEAD series. for _, e := range vectorRes { for k, v := range e.Metric { if k == "tenant" { - h.tenantCurrentSeriesMap[string(v)] = float64(e.Value) - level.Debug(h.logger).Log("msg", "tenant value queried", "tenant", string(v), "value", e.Value) + a.tenantCurrentSeriesMap[string(v)] = float64(e.Value) + level.Debug(logger).Log("msg", "tenant value queried", "tenant", string(v), "value", e.Value) } } } @@ -577,9 +595,11 @@ func (h *Handler) QueryMetaMonitoring(ctx context.Context) error { // isUnderLimit ensures that the current number of active series for a tenant does not exceed given limit. // It does so in a best-effort way, i.e, in case meta-monitoring is unreachable, it does not impose limits. -// TODO(saswatamcode): Add capability to configure diff limits for diff tenants. -func (h *Handler) isUnderLimit(tenant string, logger log.Logger) (bool, error) { - if h.options.MaxPerTenantLimit == 0 || h.options.MetaMonitoringUrl.Host == "" { +// TODO(saswatamcode): Add capability to configure different limits for different tenants. +func (a *activeSeriesLimit) isUnderLimit(tenant string, logger log.Logger) (bool, error) { + a.mtx.Lock() + defer a.mtx.Unlock() + if a.limit == 0 || a.metaMonitoringURL.Host == "" { return true, nil } @@ -588,19 +608,17 @@ func (h *Handler) isUnderLimit(tenant string, logger log.Logger) (bool, error) { // series. As such metric is updated in intervals, it is possible // that Receive ingests more series than the limit, before detecting that // a tenant has exceeded the set limits. - v, ok := h.tenantCurrentSeriesMap[tenant] + v, ok := a.tenantCurrentSeriesMap[tenant] if !ok { return true, errors.New("tenant not in current series map") } - if v >= float64(h.options.MaxPerTenantLimit) { - level.Error(logger).Log("msg", "tenant above limit", "currentSeries", v, "limit", h.options.MaxPerTenantLimit) - h.limitedRequests.WithLabelValues(tenant).Inc() + if v >= float64(a.limit) { + level.Error(logger).Log("msg", "tenant above limit", "currentSeries", v, "limit", a.limit) + a.limitedRequests.WithLabelValues(tenant).Inc() return false, nil } - level.Debug(logger).Log("msg", "tenant is under limit", "currentSeries", v) - return true, nil } diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 0506e0136b0..9f66f4efbb9 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -606,336 +606,6 @@ func TestReceive(t *testing.T) { }) }) - t.Run("single_active_series_limiting", func(t *testing.T) { - /* - The single_active_series_limiting suite configures one Router and one Ingestor - along with a test avalanche writer and dedicated meta-monitoring. - - ┌──────────┐ - │Avalanche │ - └────┬─────┘ - │ - ┌────▼─────┐ - │Router │─────────────► Meta-monitoring - └────┬─────┘ - │ - ┌────▼─────┐ - │Ingestor │ - └──────────┘ - - NB: Made with asciiflow.com - you can copy & paste the above there to modify. - */ - - t.Parallel() - e, err := e2e.NewDockerEnvironment("e2e_single_active_series_limiting") - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - // This can be treated as the meta-monitoring service. - meta, err := e2emonitoring.Start(e) - testutil.Ok(t, err) - - // Setup a ingestors. - ingestor := e2ethanos.NewReceiveBuilder(e, "i1").WithIngestionEnabled().Init() - - h := receive.HashringConfig{ - Endpoints: []string{ - ingestor.InternalEndpoint("grpc"), - }, - } - - // Setup one router with in front of the ingestor. - router := e2ethanos.NewReceiveBuilder(e, "r1").WithRouting(1, h).WithValidationEnabled(5, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init() - testutil.Ok(t, e2e.StartAndWaitReady(ingestor, router)) - - querier := e2ethanos.NewQuerierBuilder(e, "1", ingestor.InternalEndpoint("grpc")).Init() - testutil.Ok(t, e2e.StartAndWaitReady(querier)) - - testutil.Ok(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics())) - - // Avalanche in this configuration, would send 5 requests each with 10 new timeseries. - // One request always fails due to TSDB not being ready for new tenant. - // So without limiting we end up with 40 timeseries. - avalanche := e2ethanos.NewAvalanche(e, "avalanche", - e2ethanos.AvalancheOptions{ - MetricCount: "10", - SeriesCount: "1", - MetricInterval: "30", - SeriesInterval: "3600", - ValueInterval: "3600", - - RemoteURL: e2ethanos.RemoteWriteEndpoint(router.InternalEndpoint("remote-write")), - RemoteWriteInterval: "30s", - RemoteBatchSize: "10", - RemoteRequestCount: "5", - - TenantID: "avalanche-tenant", - }) - - testutil.Ok(t, e2e.StartAndWaitReady(avalanche)) - - // Here, 3/5 requests are failed due to limiting, as one request fails due to TSDB readiness and we ingest one request. - testutil.Ok(t, router.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_receive_head_series_limited_requests_total"}, e2e.WithWaitBackoff(&backoff.Config{Min: 1 * time.Second, Max: 10 * time.Minute, MaxRetries: 200}), e2e.WaitMissingMetrics())) - - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - t.Cleanup(cancel) - - // Here, once we ingest 10 new series, we go above the limit by 5. After this, no other remote_write request is ingested. - queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { - return "sum(prometheus_tsdb_head_series{tenant=\"avalanche-tenant\"}) - on() thanos_receive_tenant_head_series_limit{instance=\"e2e_single_active_series_limiting-receive-r1:8080\", job=\"receive-r1\"}" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, model.Vector{ - &model.Sample{ - Metric: model.Metric{}, - Value: model.SampleValue(5), - }, - }) - - // Query meta-monitoring solution to assert that only 10 timeseries have been ingested. - queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "sum(prometheus_tsdb_head_series{tenant=\"avalanche-tenant\"})" }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, model.Vector{ - &model.Sample{ - Metric: model.Metric{}, - Value: model.SampleValue(10), - }, - }) - }) - - t.Run("router_active_series_limiting", func(t *testing.T) { - /* - The router_active_series_limiting suite configures separate routing and ingesting components - along with a test avalanche writer and dedicated meta-monitoring. - - ┌────────────┐ - │ │ - │ Avalanche │ - │ │ - └─────┬──────┘ - │ - ┌─────▼──────┐ - │ │ - ┌───────┤ Router ├───────┬──────► Meta-monitoring - │ │ │ │ - │ └─────┬──────┘ │ - │ │ │ - ┌──────▼─────┐ ┌─────▼──────┐ ┌─────▼──────┐ - │ │ │ │ │ │ - │ Ingestor │ │ Ingestor │ │ Ingestor │ - │ │ │ │ │ │ - └─────┬──────┘ └─────┬──────┘ └──────┬─────┘ - │ │ │ - │ │ │ - │ ┌─────▼──────┐ │ - │ │ │ │ - └────────► Query ◄────────┘ - │ │ - └────────────┘ - - NB: Made with asciiflow.com - you can copy & paste the above there to modify. - */ - - t.Parallel() - e, err := e2e.NewDockerEnvironment("e2e_router_active_series_limiting") - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - // This can be treated as the meta-monitoring service. - meta, err := e2emonitoring.Start(e) - testutil.Ok(t, err) - - // Setup 3 ingestors. - ingestor1 := e2ethanos.NewReceiveBuilder(e, "i1").WithIngestionEnabled().Init() - ingestor2 := e2ethanos.NewReceiveBuilder(e, "i2").WithIngestionEnabled().Init() - ingestor3 := e2ethanos.NewReceiveBuilder(e, "i3").WithIngestionEnabled().Init() - - h := receive.HashringConfig{ - Endpoints: []string{ - ingestor1.InternalEndpoint("grpc"), - ingestor2.InternalEndpoint("grpc"), - ingestor3.InternalEndpoint("grpc"), - }, - } - - // Setup one router with in front of 3 ingestors. - router := e2ethanos.NewReceiveBuilder(e, "r1").WithRouting(1, h).WithValidationEnabled(5, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init() - testutil.Ok(t, e2e.StartAndWaitReady(ingestor1, ingestor2, ingestor3, router)) - - querier := e2ethanos.NewQuerierBuilder(e, "1", ingestor1.InternalEndpoint("grpc"), ingestor2.InternalEndpoint("grpc"), ingestor3.InternalEndpoint("grpc")).Init() - testutil.Ok(t, e2e.StartAndWaitReady(querier)) - - testutil.Ok(t, querier.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics())) - - // Avalanche in this configuration, would send 5 requests each with 10 new timeseries. - // One request always fails due to TSDB not being ready for new tenant. - // So without limiting we end up with 40 timeseries. - avalanche := e2ethanos.NewAvalanche(e, "avalanche", - e2ethanos.AvalancheOptions{ - MetricCount: "10", - SeriesCount: "1", - MetricInterval: "30", - SeriesInterval: "3600", - ValueInterval: "3600", - - RemoteURL: e2ethanos.RemoteWriteEndpoint(router.InternalEndpoint("remote-write")), - RemoteWriteInterval: "30s", - RemoteBatchSize: "10", - RemoteRequestCount: "5", - - TenantID: "avalanche-tenant", - }) - - testutil.Ok(t, e2e.StartAndWaitReady(avalanche)) - - // Here, 3/5 requests are failed due to limiting, as one request fails due to TSDB readiness and we ingest one request. - testutil.Ok(t, router.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_receive_head_series_limited_requests_total"}, e2e.WithWaitBackoff(&backoff.Config{Min: 1 * time.Second, Max: 10 * time.Minute, MaxRetries: 200}), e2e.WaitMissingMetrics())) - - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - t.Cleanup(cancel) - - // Here, once we ingest 10 new series, we go above the limit by 5. After this, no other remote_write request is ingested. - queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { - return "sum(prometheus_tsdb_head_series{tenant=\"avalanche-tenant\"}) - on() thanos_receive_tenant_head_series_limit{instance=\"e2e_router_active_series_limiting-receive-r1:8080\", job=\"receive-r1\"}" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, model.Vector{ - &model.Sample{ - Metric: model.Metric{}, - Value: model.SampleValue(5), - }, - }) - - // Query meta-monitoring solution to assert that only 10 timeseries have been ingested. - queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "sum(prometheus_tsdb_head_series{tenant=\"avalanche-tenant\"})" }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, model.Vector{ - &model.Sample{ - Metric: model.Metric{}, - Value: model.SampleValue(10), - }, - }) - }) - - t.Run("hashring_active_series_limiting", func(t *testing.T) { - /* - The hashring_active_series_limiting suite configures a hashring with a - avalanche writer and dedicated meta-monitoring. - - ┌──────────┐ - │ │ - │Avalanche │ - │ │ - │ │ - └────┬─────┘ - │ - ┌────▼─────┐ - │ │ - │Router ├────────────────► Meta-monitoring - │Ingestor │ - │ │ - └──▲─┬──▲──┘ - │ │ │ - ┌──────────┐ │ │ │ ┌──────────┐ - │ │ │ │ │ │ │ - │Router ◄───────┘ │ └────────►Router │ - │Ingestor │ │ │Ingestor │ - │ ◄─────────┼───────────► │ - └────┬─────┘ │ └────┬─────┘ - │ │ │ - │ ┌────▼─────┐ │ - │ │ │ │ - └──────────► Query ◄──────────┘ - │ │ - │ │ - └──────────┘ - - NB: Made with asciiflow.com - you can copy & paste the above there to modify. - */ - - t.Parallel() - e, err := e2e.NewDockerEnvironment("e2e_hashring_active_series_limiting") - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - // This can be treated as the meta-monitoring service. - meta, err := e2emonitoring.Start(e) - testutil.Ok(t, err) - - // Setup 3 RouterIngestors with an active series limit of 5. - ingestor1 := e2ethanos.NewReceiveBuilder(e, "i1").WithValidationEnabled(5, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).WithIngestionEnabled() - ingestor2 := e2ethanos.NewReceiveBuilder(e, "i2").WithValidationEnabled(5, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).WithIngestionEnabled() - ingestor3 := e2ethanos.NewReceiveBuilder(e, "i3").WithValidationEnabled(5, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).WithIngestionEnabled() - - h := receive.HashringConfig{ - Endpoints: []string{ - ingestor1.InternalEndpoint("grpc"), - ingestor2.InternalEndpoint("grpc"), - ingestor3.InternalEndpoint("grpc"), - }, - } - - i1Runnable := ingestor1.WithRouting(1, h).Init() - i2Runnable := ingestor2.WithRouting(1, h).Init() - i3Runnable := ingestor3.WithRouting(1, h).Init() - - testutil.Ok(t, e2e.StartAndWaitReady(i1Runnable, i2Runnable, i3Runnable)) - - querier := e2ethanos.NewQuerierBuilder(e, "1", ingestor1.InternalEndpoint("grpc"), ingestor2.InternalEndpoint("grpc"), ingestor3.InternalEndpoint("grpc")).Init() - testutil.Ok(t, e2e.StartAndWaitReady(querier)) - - testutil.Ok(t, querier.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics())) - - // Avalanche in this configuration, would send 5 requests each with 10 new timeseries. - // One request always fails due to TSDB not being ready for new tenant. - // So without limiting we end up with 40 timeseries. - avalanche := e2ethanos.NewAvalanche(e, "avalanche", - e2ethanos.AvalancheOptions{ - MetricCount: "10", - SeriesCount: "1", - MetricInterval: "30", - SeriesInterval: "3600", - ValueInterval: "3600", - - RemoteURL: e2ethanos.RemoteWriteEndpoint(ingestor1.InternalEndpoint("remote-write")), - RemoteWriteInterval: "30s", - RemoteBatchSize: "10", - RemoteRequestCount: "5", - - TenantID: "avalanche-tenant", - }) - - testutil.Ok(t, e2e.StartAndWaitReady(avalanche)) - - // Here, 3/5 requests are failed due to limiting, as one request fails due to TSDB readiness and we ingest one initial request. - testutil.Ok(t, i1Runnable.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_receive_head_series_limited_requests_total"}, e2e.WithWaitBackoff(&backoff.Config{Min: 1 * time.Second, Max: 10 * time.Minute, MaxRetries: 200}), e2e.WaitMissingMetrics())) - - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - t.Cleanup(cancel) - - // Here, once we ingest 10 new series, we go above the limit by 5. After this, no other remote_write request is ingested. - queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { - return "sum(prometheus_tsdb_head_series{tenant=\"avalanche-tenant\"}) - on() thanos_receive_tenant_head_series_limit{instance=\"e2e_hashring_active_series_limiting-receive-i1:8080\", job=\"receive-i1\"}" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, model.Vector{ - &model.Sample{ - Metric: model.Metric{}, - Value: model.SampleValue(5), - }, - }) - - // Query meta-monitoring solution to assert that only 10 timeseries have been ingested. - queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "sum(prometheus_tsdb_head_series{tenant=\"avalanche-tenant\"})" }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, model.Vector{ - &model.Sample{ - Metric: model.Metric{}, - Value: model.SampleValue(10), - }, - }) - }) - t.Run("multitenant_active_series_limiting", func(t *testing.T) { /* @@ -1114,193 +784,4 @@ func TestReceive(t *testing.T) { }, }) }) - - t.Run("out_of_sync_active_series_limiting", func(t *testing.T) { - - /* - The out_of_sync_active_series_limiting suite configures a hashring with - two avalanche writers and dedicated meta-monitoring. But it starts Receive - instances at different times, so that meta-monitoring queries occur at different - intervals for each. - - ┌──────────┐ ┌──────────┐ - │ │ │ │ - │Avalanche │ │Avalanche │ - │ │ │ │ - │ │ │ │ - └──────────┴──────────┐ ┌──────────┴──────────┘ - │ │ - ┌─▼─────▼──┐ - │ │ - │Router ├────────────────► Meta-monitoring - │Ingestor │ - │ │ - └──▲─┬──▲──┘ - │ │ │ - ┌──────────┐ │ │ │ ┌──────────┐ - │ │ │ │ │ │ │ - │Router ◄───────┘ │ └────────►Router │ - │Ingestor │ │ │Ingestor │ - │ ◄─────────┼───────────► │ - └────┬─────┘ │ └────┬─────┘ - │ │ │ - │ ┌────▼─────┐ │ - │ │ │ │ - └──────────► Query ◄──────────┘ - │ │ - │ │ - └──────────┘ - - NB: Made with asciiflow.com - you can copy & paste the above there to modify. - */ - - t.Parallel() - e, err := e2e.NewDockerEnvironment("e2e_out_of_sync_active_series_limiting") - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - // This can be treated as the meta-monitoring service. - meta, err := e2emonitoring.Start(e) - testutil.Ok(t, err) - - // Setup 3 RouterIngestors with a limit of 10 active series. - ingestor1 := e2ethanos.NewReceiveBuilder(e, "i1").WithIngestionEnabled() - ingestor2 := e2ethanos.NewReceiveBuilder(e, "i2").WithIngestionEnabled() - ingestor3 := e2ethanos.NewReceiveBuilder(e, "i3").WithIngestionEnabled() - - h := receive.HashringConfig{ - Endpoints: []string{ - ingestor1.InternalEndpoint("grpc"), - ingestor2.InternalEndpoint("grpc"), - ingestor3.InternalEndpoint("grpc"), - }, - } - - i1Runnable := ingestor1.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init() - i2Runnable := ingestor2.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init() - i3Runnable := ingestor3.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init() - - testutil.Ok(t, e2e.StartAndWaitReady(i1Runnable)) - - time.Sleep(7 * time.Second) - - testutil.Ok(t, e2e.StartAndWaitReady(i2Runnable)) - - time.Sleep(12 * time.Second) - - testutil.Ok(t, e2e.StartAndWaitReady(i3Runnable)) - - querier := e2ethanos.NewQuerierBuilder(e, "1", ingestor1.InternalEndpoint("grpc"), ingestor2.InternalEndpoint("grpc"), ingestor3.InternalEndpoint("grpc")).Init() - testutil.Ok(t, e2e.StartAndWaitReady(querier)) - - testutil.Ok(t, querier.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics())) - - // We run two avalanches, one tenant which exceeds the limit, and one tenant which remains under it. - - // Avalanche in this configuration, would send 5 requests each with 10 new timeseries. - // One request always fails due to TSDB not being ready for new tenant. - // So without limiting we end up with 40 timeseries and 40 samples. - avalanche1 := e2ethanos.NewAvalanche(e, "avalanche-1", - e2ethanos.AvalancheOptions{ - MetricCount: "10", - SeriesCount: "1", - MetricInterval: "30", - SeriesInterval: "3600", - ValueInterval: "3600", - - RemoteURL: e2ethanos.RemoteWriteEndpoint(ingestor1.InternalEndpoint("remote-write")), - RemoteWriteInterval: "30s", - RemoteBatchSize: "10", - RemoteRequestCount: "5", - - TenantID: "exceed-tenant", - }) - - // Avalanche in this configuration, would send 5 requests each with 5 of the same timeseries. - // One request always fails due to TSDB not being ready for new tenant. - // So we end up with 5 timeseries, 20 samples. - avalanche2 := e2ethanos.NewAvalanche(e, "avalanche-2", - e2ethanos.AvalancheOptions{ - MetricCount: "5", - SeriesCount: "1", - MetricInterval: "3600", - SeriesInterval: "3600", - ValueInterval: "3600", - - RemoteURL: e2ethanos.RemoteWriteEndpoint(ingestor1.InternalEndpoint("remote-write")), - RemoteWriteInterval: "30s", - RemoteBatchSize: "5", - RemoteRequestCount: "5", - - TenantID: "under-tenant", - }) - - testutil.Ok(t, e2e.StartAndWaitReady(avalanche1, avalanche2)) - - // Here, 3/5 requests are failed due to limiting, as one request fails due to TSDB readiness and we ingest one initial request. - // 3 limited requests belong to the exceed-tenant. - testutil.Ok(t, i1Runnable.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_receive_head_series_limited_requests_total"}, e2e.WithWaitBackoff(&backoff.Config{Min: 1 * time.Second, Max: 10 * time.Minute, MaxRetries: 200}), e2e.WaitMissingMetrics())) - - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - t.Cleanup(cancel) - - // Here for exceed-tenant we go above limit by 10, which results in 0 value. - queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { - return "sum(prometheus_tsdb_head_series{tenant=\"exceed-tenant\"}) - on() thanos_receive_tenant_head_series_limit{instance=\"e2e_out_of_sync_active_series_limiting-receive-i1:8080\", job=\"receive-i1\"}" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, model.Vector{ - &model.Sample{ - Metric: model.Metric{}, - Value: model.SampleValue(0), - }, - }) - - // For under-tenant we stay at -5, as we have only pushed 5 series. - queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { - return "sum(prometheus_tsdb_head_series{tenant=\"under-tenant\"}) - on() thanos_receive_tenant_head_series_limit{instance=\"e2e_out_of_sync_active_series_limiting-receive-i1:8080\", job=\"receive-i1\"}" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, model.Vector{ - &model.Sample{ - Metric: model.Metric{}, - Value: model.SampleValue(-5), - }, - }) - - // Query meta-monitoring solution to assert that only 10 timeseries have been ingested for exceed-tenant. - queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "sum(prometheus_tsdb_head_series{tenant=\"exceed-tenant\"})" }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, model.Vector{ - &model.Sample{ - Metric: model.Metric{}, - Value: model.SampleValue(10), - }, - }) - - // Query meta-monitoring solution to assert that only 5 timeseries have been ingested for under-tenant. - queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "sum(prometheus_tsdb_head_series{tenant=\"under-tenant\"})" }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, model.Vector{ - &model.Sample{ - Metric: model.Metric{}, - Value: model.SampleValue(5), - }, - }) - - // Query meta-monitoring solution to assert that 3 requests were limited for exceed-tenant and none for under-tenant. - queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "thanos_receive_head_series_limited_requests_total" }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, model.Vector{ - &model.Sample{ - Metric: model.Metric{ - "__name__": "thanos_receive_head_series_limited_requests_total", - "instance": "e2e_out_of_sync_active_series_limiting-receive-i1:8080", - "job": "receive-i1", - "tenant": "exceed-tenant", - }, - Value: model.SampleValue(3), - }, - }) - }) }