Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query-scheduler querier inflight requests: convert to summary metric #8417

11 changes: 7 additions & 4 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,13 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
Name: "cortex_query_frontend_enqueue_duration_seconds",
Help: "Time spent by requests waiting to join the queue or be rejected.",
})
querierInflightRequests := promauto.With(registerer).NewGaugeVec(
prometheus.GaugeOpts{
Name: "cortex_query_frontend_querier_inflight_requests",
Help: "Number of inflight requests being processed on all querier-scheduler connections.",
querierInflightRequests := promauto.With(registerer).NewSummaryVec(
prometheus.SummaryOpts{
Name: "cortex_query_frontend_querier_inflight_requests",
Help: "Number of inflight requests being processed on all querier-scheduler connections. Quantile buckets keep track of inflight requests over the last 60s.",
Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
MaxAge: time.Minute,
AgeBuckets: 6,
},
[]string{"query_component"},
)
Expand Down
58 changes: 48 additions & 10 deletions pkg/scheduler/queue/query_component_utilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package queue

import (
"math"
"sync"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -52,11 +53,15 @@ type QueryComponentUtilization struct {
// for queries to the less-loaded query component when the query queue becomes backlogged.
targetReservedCapacity float64

inflightRequestsMu sync.RWMutex
// inflightRequests tracks requests from the time the request was successfully sent to a querier
// to the time the request was completed by the querier or failed due to cancel, timeout, or disconnect.
inflightRequests map[RequestKey]*SchedulerRequest
ingesterInflightRequests int
storeGatewayInflightRequests int
querierInflightRequestsTotal int

querierInflightRequestsGauge *prometheus.GaugeVec
querierInflightRequestsMetric *prometheus.SummaryVec
}

// DefaultReservedQueryComponentCapacity reserves 1 / 3 of querier-worker connections
Expand All @@ -73,7 +78,7 @@ const MaxReservedQueryComponentCapacity = 0.5

func NewQueryComponentUtilization(
targetReservedCapacity float64,
querierInflightRequests *prometheus.GaugeVec,
querierInflightRequestsMetric *prometheus.SummaryVec,
) (*QueryComponentUtilization, error) {

if targetReservedCapacity >= MaxReservedQueryComponentCapacity {
Expand All @@ -83,11 +88,12 @@ func NewQueryComponentUtilization(
return &QueryComponentUtilization{
targetReservedCapacity: targetReservedCapacity,

inflightRequests: map[RequestKey]*SchedulerRequest{},
ingesterInflightRequests: 0,
storeGatewayInflightRequests: 0,
querierInflightRequestsTotal: 0,

querierInflightRequestsGauge: querierInflightRequests,
querierInflightRequestsMetric: querierInflightRequestsMetric,
}, nil
}

Expand Down Expand Up @@ -135,6 +141,8 @@ func (qcl *QueryComponentUtilization) ExceedsThresholdForComponentName(
}

isIngester, isStoreGateway := queryComponentFlags(name)
qcl.inflightRequestsMu.RLock()
defer qcl.inflightRequestsMu.RUnlock()
if isIngester {
if connectedWorkers-(qcl.ingesterInflightRequests) <= minReservedConnections {
return true, Ingester
Expand All @@ -148,32 +156,62 @@ func (qcl *QueryComponentUtilization) ExceedsThresholdForComponentName(
return false, ""
}

// IncrementForComponentName is called when a request is sent to a querier
func (qcl *QueryComponentUtilization) IncrementForComponentName(expectedQueryComponent string) {
// MarkRequestSent is called when a request is sent to a querier
func (qcl *QueryComponentUtilization) MarkRequestSent(req *SchedulerRequest) {
if req != nil {
qcl.inflightRequestsMu.Lock()
defer qcl.inflightRequestsMu.Unlock()

qcl.inflightRequests[req.Key()] = req
qcl.incrementForComponentName(req.ExpectedQueryComponentName())
}
}

// MarkRequestCompleted is called when a querier completes or fails a request
func (qcl *QueryComponentUtilization) MarkRequestCompleted(req *SchedulerRequest) {
if req != nil {
qcl.inflightRequestsMu.Lock()
defer qcl.inflightRequestsMu.Unlock()

reqKey := req.Key()
if req, ok := qcl.inflightRequests[reqKey]; ok {
qcl.decrementForComponentName(req.ExpectedQueryComponentName())
}
delete(qcl.inflightRequests, reqKey)
}
}

func (qcl *QueryComponentUtilization) incrementForComponentName(expectedQueryComponent string) {
qcl.updateForComponentName(expectedQueryComponent, 1)
}

// DecrementForComponentName is called when a querier completes or fails a request
func (qcl *QueryComponentUtilization) DecrementForComponentName(expectedQueryComponent string) {
func (qcl *QueryComponentUtilization) decrementForComponentName(expectedQueryComponent string) {
qcl.updateForComponentName(expectedQueryComponent, -1)
}

func (qcl *QueryComponentUtilization) updateForComponentName(expectedQueryComponent string, increment int) {
isIngester, isStoreGateway := queryComponentFlags(expectedQueryComponent)

// lock is expected to be obtained by the calling method to mark the request as sent or completed
if isIngester {
qcl.ingesterInflightRequests += increment
qcl.querierInflightRequestsGauge.WithLabelValues(string(Ingester)).Add(float64(increment))
}
if isStoreGateway {
qcl.storeGatewayInflightRequests += increment
qcl.querierInflightRequestsGauge.WithLabelValues(string(StoreGateway)).Add(float64(increment))
}
qcl.querierInflightRequestsTotal += increment
}

func (qcl *QueryComponentUtilization) ObserveInflightRequests() {
qcl.inflightRequestsMu.RLock()
defer qcl.inflightRequestsMu.RUnlock()
qcl.querierInflightRequestsMetric.WithLabelValues(string(Ingester)).Observe(float64(qcl.ingesterInflightRequests))
qcl.querierInflightRequestsMetric.WithLabelValues(string(StoreGateway)).Observe(float64(qcl.storeGatewayInflightRequests))
}

// GetForComponent is a test-only util
func (qcl *QueryComponentUtilization) GetForComponent(component QueryComponent) int {
qcl.inflightRequestsMu.RLock()
defer qcl.inflightRequestsMu.RUnlock()
switch component {
case Ingester:
return qcl.ingesterInflightRequests
Expand Down
31 changes: 24 additions & 7 deletions pkg/scheduler/queue/query_component_utilization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,24 @@ package queue

import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/stretchr/testify/require"
)

func testQuerierInflightRequestsGauge() *prometheus.GaugeVec {
return promauto.With(prometheus.NewPedanticRegistry()).NewGaugeVec(prometheus.GaugeOpts{
Name: "test_query_scheduler_querier_inflight_requests",
Help: "[test] Number of inflight requests being processed on a querier-scheduler connection.",
}, []string{"query_component"})
func testQuerierInflightRequestsGauge() *prometheus.SummaryVec {
return promauto.With(prometheus.NewPedanticRegistry()).NewSummaryVec(
prometheus.SummaryOpts{
Name: "test_cortex_query_scheduler_querier_inflight_requests",
Help: "[test] Number of inflight requests being processed on all querier-scheduler connections. Quantile buckets keep track of inflight requests over the last 60s.",
Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
MaxAge: time.Minute,
AgeBuckets: 6,
},
[]string{"query_component"},
)
}

func TestExceedsUtilizationThresholdForQueryComponents(t *testing.T) {
Expand Down Expand Up @@ -216,11 +223,21 @@ func TestExceedsUtilizationThresholdForQueryComponents(t *testing.T) {
require.NoError(t, err)

for i := 0; i < testCase.ingesterInflightRequests; i++ {
queryComponentUtilization.IncrementForComponentName(ingesterQueueDimension)
ingesterInflightRequest := &SchedulerRequest{
FrontendAddr: "frontend-a",
QueryID: uint64(i),
AdditionalQueueDimensions: []string{ingesterQueueDimension},
}
queryComponentUtilization.MarkRequestSent(ingesterInflightRequest)
}

for i := 0; i < testCase.storeGatewayInflightRequests; i++ {
queryComponentUtilization.IncrementForComponentName(storeGatewayQueueDimension)
storeGatewayInflightRequest := &SchedulerRequest{
FrontendAddr: "frontend-b",
QueryID: uint64(i),
AdditionalQueueDimensions: []string{storeGatewayQueueDimension},
}
queryComponentUtilization.MarkRequestSent(storeGatewayInflightRequest)
}

exceedsThreshold, queryComponent := queryComponentUtilization.ExceedsThresholdForComponentName(
Expand Down
75 changes: 27 additions & 48 deletions pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@ type RequestQueue struct {
stopRequested chan struct{} // Written to by stop() to wake up dispatcherLoop() in response to a stop request.
stopCompleted chan struct{} // Closed by dispatcherLoop() after a stop is requested and the dispatcher has stopped.

// querierInflightRequests tracks requests from the time the request was successfully sent to a querier
// to the time the request was completed by the querier or failed due to cancel, timeout, or disconnect.
querierInflightRequests map[RequestKey]*SchedulerRequest
requestsToEnqueue chan requestToEnqueue
requestsSent chan *SchedulerRequest
requestsCompleted chan *SchedulerRequest
Expand Down Expand Up @@ -181,9 +178,9 @@ func NewRequestQueue(
queueLength *prometheus.GaugeVec,
discardedRequests *prometheus.CounterVec,
enqueueDuration prometheus.Histogram,
querierInflightRequestsGauge *prometheus.GaugeVec,
querierInflightRequestsMetric *prometheus.SummaryVec,
) (*RequestQueue, error) {
queryComponentCapacity, err := NewQueryComponentUtilization(DefaultReservedQueryComponentCapacity, querierInflightRequestsGauge)
queryComponentCapacity, err := NewQueryComponentUtilization(DefaultReservedQueryComponentCapacity, querierInflightRequestsMetric)
if err != nil {
return nil, err
}
Expand All @@ -206,7 +203,6 @@ func NewRequestQueue(
stopCompleted: make(chan struct{}),

requestsToEnqueue: make(chan requestToEnqueue),
querierInflightRequests: map[RequestKey]*SchedulerRequest{},
requestsSent: make(chan *SchedulerRequest),
requestsCompleted: make(chan *SchedulerRequest),
querierOperations: make(chan querierOperation),
Expand All @@ -217,7 +213,7 @@ func NewRequestQueue(
queueBroker: newQueueBroker(maxOutstandingPerTenant, additionalQueueDimensionsEnabled, forgetDelay),
}

q.Service = services.NewTimerService(forgetCheckPeriod, q.starting, q.forgetDisconnectedQueriers, q.stop).WithName("request queue")
q.Service = services.NewBasicService(q.starting, q.running, q.stop).WithName("request queue")

return q, nil
}
Expand All @@ -229,6 +225,29 @@ func (q *RequestQueue) starting(_ context.Context) error {
return nil
}

func (q *RequestQueue) running(ctx context.Context) error {
// periodically submit a message to dispatcherLoop to forget disconnected queriers
forgetDisconnectedQueriersTicker := time.NewTicker(forgetCheckPeriod)
defer forgetDisconnectedQueriersTicker.Stop()

// periodically submit a message to dispatcherLoop to observe inflight requests;
// same as scheduler, we observe inflight requests frequently and at regular intervals
// to have a good approximation of max inflight requests over percentiles of time.
inflightRequestsTicker := time.NewTicker(250 * time.Millisecond)
defer inflightRequestsTicker.Stop()

for {
select {
case <-forgetDisconnectedQueriersTicker.C:
q.submitForgetDisconnectedQueriers(ctx)
case <-inflightRequestsTicker.C:
q.QueryComponentUtilization.ObserveInflightRequests()
case <-ctx.Done():
return nil
}
}
}

func (q *RequestQueue) dispatcherLoop() {
stopping := false

Expand All @@ -248,10 +267,6 @@ func (q *RequestQueue) dispatcherLoop() {
if err == nil {
needToDispatchQueries = true
}
case sentReq := <-q.requestsSent:
q.processRequestSent(sentReq)
case completedReq := <-q.requestsCompleted:
q.processRequestCompleted(completedReq)
case waitingConn := <-q.waitingQuerierConns:
requestSent := q.trySendNextRequestForQuerier(waitingConn)
if !requestSent {
Expand Down Expand Up @@ -470,9 +485,8 @@ func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64 {
return float64(q.connectedQuerierWorkers.Load())
}

func (q *RequestQueue) forgetDisconnectedQueriers(_ context.Context) error {
func (q *RequestQueue) submitForgetDisconnectedQueriers(_ context.Context) {
q.submitQuerierOperation("", forgetDisconnected)
return nil
}

func (q *RequestQueue) SubmitRegisterQuerierConnection(querierID string) {
Expand Down Expand Up @@ -534,41 +548,6 @@ func (q *RequestQueue) processForgetDisconnectedQueriers() (resharded bool) {
return q.queueBroker.forgetDisconnectedQueriers(time.Now())
}

func (q *RequestQueue) SubmitRequestSent(req *SchedulerRequest) {
if req != nil {
select {
case q.requestsSent <- req:
case <-q.stopCompleted:
}
}
}

func (q *RequestQueue) processRequestSent(req *SchedulerRequest) {
if req != nil {
q.querierInflightRequests[req.Key()] = req
q.QueryComponentUtilization.IncrementForComponentName(req.ExpectedQueryComponentName())
}
}

func (q *RequestQueue) SubmitRequestCompleted(req *SchedulerRequest) {
if req != nil {
select {
case q.requestsCompleted <- req:
case <-q.stopCompleted:
}
}
}

func (q *RequestQueue) processRequestCompleted(req *SchedulerRequest) {
if req != nil {
reqKey := req.Key()
if req, ok := q.querierInflightRequests[reqKey]; ok {
q.QueryComponentUtilization.DecrementForComponentName(req.ExpectedQueryComponentName())
}
delete(q.querierInflightRequests, reqKey)
}
}

// waitingQuerierConn is a "request" indicating that the querier is ready to receive the next query request.
// It embeds the unbuffered `recvChan` to receive the requestForQuerier "response" from the RequestQueue.
// The request/response terminology is avoided in naming to disambiguate with the actual query requests.
Expand Down
14 changes: 7 additions & 7 deletions pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestMultiDimensionalQueueFairnessSlowConsumerEffects(t *testing.T) {
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}),
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}),
)
require.NoError(t, err)

Expand Down Expand Up @@ -239,7 +239,7 @@ func BenchmarkConcurrentQueueOperations(b *testing.B) {
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}),
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}),
)
require.NoError(b, err)

Expand Down Expand Up @@ -411,7 +411,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}),
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}),
)
require.NoError(t, err)

Expand Down Expand Up @@ -480,7 +480,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ReshardNotifiedCorrectlyForMultip
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}),
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}),
)
require.NoError(t, err)

Expand Down Expand Up @@ -564,7 +564,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnAfterContextCancelled
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}),
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}),
)
require.NoError(t, err)

Expand Down Expand Up @@ -605,7 +605,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnImmediatelyIfQuerierI
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}),
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}),
)
require.NoError(t, err)

Expand Down Expand Up @@ -634,7 +634,7 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}),
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}),
)
require.NoError(t, err)

Expand Down
Loading
Loading