Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query-scheduler querier inflight requests: convert to summary metric #8417

11 changes: 7 additions & 4 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,13 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
Name: "cortex_query_frontend_enqueue_duration_seconds",
Help: "Time spent by requests waiting to join the queue or be rejected.",
})
querierInflightRequests := promauto.With(registerer).NewGaugeVec(
prometheus.GaugeOpts{
Name: "cortex_query_frontend_querier_inflight_requests",
Help: "Number of inflight requests being processed on all querier-scheduler connections.",
querierInflightRequests := promauto.With(registerer).NewSummaryVec(
prometheus.SummaryOpts{
Name: "cortex_query_scheduler_querier_inflight_requests",
Help: "Number of inflight requests being processed on all querier-scheduler connections. . Quantile buckets keep track of inflight requests over the last 60s.",
Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
MaxAge: time.Minute,
AgeBuckets: 6,
},
[]string{"query_component"},
)
Expand Down
11 changes: 6 additions & 5 deletions pkg/scheduler/queue/query_component_utilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type QueryComponentUtilization struct {
storeGatewayInflightRequests int
querierInflightRequestsTotal int

querierInflightRequestsGauge *prometheus.GaugeVec
querierInflightRequestsMetric *prometheus.SummaryVec
}

// DefaultReservedQueryComponentCapacity reserves 1 / 3 of querier-worker connections
Expand All @@ -73,7 +73,7 @@ const MaxReservedQueryComponentCapacity = 0.5

func NewQueryComponentUtilization(
targetReservedCapacity float64,
querierInflightRequests *prometheus.GaugeVec,
querierInflightRequestsMetric *prometheus.SummaryVec,
) (*QueryComponentUtilization, error) {

if targetReservedCapacity >= MaxReservedQueryComponentCapacity {
Expand All @@ -87,7 +87,7 @@ func NewQueryComponentUtilization(
storeGatewayInflightRequests: 0,
querierInflightRequestsTotal: 0,

querierInflightRequestsGauge: querierInflightRequests,
querierInflightRequestsMetric: querierInflightRequestsMetric,
}, nil
}

Expand Down Expand Up @@ -163,12 +163,13 @@ func (qcl *QueryComponentUtilization) updateForComponentName(expectedQueryCompon

if isIngester {
qcl.ingesterInflightRequests += increment
qcl.querierInflightRequestsGauge.WithLabelValues(string(Ingester)).Add(float64(increment))
}
if isStoreGateway {
qcl.storeGatewayInflightRequests += increment
qcl.querierInflightRequestsGauge.WithLabelValues(string(StoreGateway)).Add(float64(increment))
}

qcl.querierInflightRequestsMetric.WithLabelValues(string(Ingester)).Observe(float64(qcl.ingesterInflightRequests))
qcl.querierInflightRequestsMetric.WithLabelValues(string(StoreGateway)).Observe(float64(qcl.storeGatewayInflightRequests))
qcl.querierInflightRequestsTotal += increment
}

Expand Down
17 changes: 12 additions & 5 deletions pkg/scheduler/queue/query_component_utilization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,24 @@ package queue

import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/stretchr/testify/require"
)

func testQuerierInflightRequestsGauge() *prometheus.GaugeVec {
return promauto.With(prometheus.NewPedanticRegistry()).NewGaugeVec(prometheus.GaugeOpts{
Name: "test_query_scheduler_querier_inflight_requests",
Help: "[test] Number of inflight requests being processed on a querier-scheduler connection.",
}, []string{"query_component"})
func testQuerierInflightRequestsGauge() *prometheus.SummaryVec {
return promauto.With(prometheus.NewPedanticRegistry()).NewSummaryVec(
prometheus.SummaryOpts{
Name: "test_cortex_query_scheduler_querier_inflight_requests",
Help: "[test] Number of inflight requests being processed on all querier-scheduler connections. . Quantile buckets keep track of inflight requests over the last 60s.",
Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
MaxAge: time.Minute,
AgeBuckets: 6,
},
[]string{"query_component"},
)
}

func TestExceedsUtilizationThresholdForQueryComponents(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ func NewRequestQueue(
queueLength *prometheus.GaugeVec,
discardedRequests *prometheus.CounterVec,
enqueueDuration prometheus.Histogram,
querierInflightRequestsGauge *prometheus.GaugeVec,
querierInflightRequestsMetric *prometheus.SummaryVec,
) (*RequestQueue, error) {
queryComponentCapacity, err := NewQueryComponentUtilization(DefaultReservedQueryComponentCapacity, querierInflightRequestsGauge)
queryComponentCapacity, err := NewQueryComponentUtilization(DefaultReservedQueryComponentCapacity, querierInflightRequestsMetric)
if err != nil {
return nil, err
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestMultiDimensionalQueueFairnessSlowConsumerEffects(t *testing.T) {
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}),
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}),
)
require.NoError(t, err)

Expand Down Expand Up @@ -239,7 +239,7 @@ func BenchmarkConcurrentQueueOperations(b *testing.B) {
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}),
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}),
)
require.NoError(b, err)

Expand Down Expand Up @@ -411,7 +411,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}),
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}),
)
require.NoError(t, err)

Expand Down Expand Up @@ -480,7 +480,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ReshardNotifiedCorrectlyForMultip
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}),
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}),
)
require.NoError(t, err)

Expand Down Expand Up @@ -564,7 +564,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnAfterContextCancelled
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}),
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}),
)
require.NoError(t, err)

Expand Down Expand Up @@ -605,7 +605,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnImmediatelyIfQuerierI
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}),
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}),
)
require.NoError(t, err)

Expand Down Expand Up @@ -634,7 +634,7 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}),
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}),
)
require.NoError(t, err)

Expand Down
14 changes: 9 additions & 5 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,17 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
Name: "cortex_query_scheduler_enqueue_duration_seconds",
Help: "Time spent by requests waiting to join the queue or be rejected.",
})
querierInflightRequestsGauge := promauto.With(registerer).NewGaugeVec(
prometheus.GaugeOpts{
Name: "cortex_query_scheduler_querier_inflight_requests",
Help: "Number of inflight requests being processed on all querier-scheduler connections.",
querierInflightRequestsMetric := promauto.With(registerer).NewSummaryVec(
prometheus.SummaryOpts{
Name: "cortex_query_scheduler_querier_inflight_requests",
Help: "Number of inflight requests being processed on all querier-scheduler connections. . Quantile buckets keep track of inflight requests over the last 60s.",
Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
MaxAge: time.Minute,
AgeBuckets: 6,
},
[]string{"query_component"},
)

s.requestQueue, err = queue.NewRequestQueue(
s.log,
cfg.MaxOutstandingPerTenant,
Expand All @@ -160,7 +164,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
s.queueLength,
s.discardedRequests,
enqueueDuration,
querierInflightRequestsGauge,
querierInflightRequestsMetric,
)
if err != nil {
return nil, err
Expand Down
Loading