diff --git a/pkg/frontend/v1/frontend.go b/pkg/frontend/v1/frontend.go index 5270af11801..a804b95057c 100644 --- a/pkg/frontend/v1/frontend.go +++ b/pkg/frontend/v1/frontend.go @@ -113,10 +113,13 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist Name: "cortex_query_frontend_enqueue_duration_seconds", Help: "Time spent by requests waiting to join the queue or be rejected.", }) - querierInflightRequests := promauto.With(registerer).NewGaugeVec( - prometheus.GaugeOpts{ - Name: "cortex_query_frontend_querier_inflight_requests", - Help: "Number of inflight requests being processed on all querier-scheduler connections.", + querierInflightRequests := promauto.With(registerer).NewSummaryVec( + prometheus.SummaryOpts{ + Name: "cortex_query_frontend_querier_inflight_requests", + Help: "Number of inflight requests being processed on all querier-scheduler connections. Quantile buckets keep track of inflight requests over the last 60s.", + Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, + MaxAge: time.Minute, + AgeBuckets: 6, }, []string{"query_component"}, ) diff --git a/pkg/scheduler/queue/query_component_utilization.go b/pkg/scheduler/queue/query_component_utilization.go index 9a6e9555781..320243c1878 100644 --- a/pkg/scheduler/queue/query_component_utilization.go +++ b/pkg/scheduler/queue/query_component_utilization.go @@ -4,6 +4,7 @@ package queue import ( "math" + "sync" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -52,11 +53,15 @@ type QueryComponentUtilization struct { // for queries to the less-loaded query component when the query queue becomes backlogged. targetReservedCapacity float64 + inflightRequestsMu sync.RWMutex + // inflightRequests tracks requests from the time the request was successfully sent to a querier + // to the time the request was completed by the querier or failed due to cancel, timeout, or disconnect. + inflightRequests map[RequestKey]*SchedulerRequest ingesterInflightRequests int storeGatewayInflightRequests int querierInflightRequestsTotal int - querierInflightRequestsGauge *prometheus.GaugeVec + querierInflightRequestsMetric *prometheus.SummaryVec } // DefaultReservedQueryComponentCapacity reserves 1 / 3 of querier-worker connections @@ -73,7 +78,7 @@ const MaxReservedQueryComponentCapacity = 0.5 func NewQueryComponentUtilization( targetReservedCapacity float64, - querierInflightRequests *prometheus.GaugeVec, + querierInflightRequestsMetric *prometheus.SummaryVec, ) (*QueryComponentUtilization, error) { if targetReservedCapacity >= MaxReservedQueryComponentCapacity { @@ -83,11 +88,12 @@ func NewQueryComponentUtilization( return &QueryComponentUtilization{ targetReservedCapacity: targetReservedCapacity, + inflightRequests: map[RequestKey]*SchedulerRequest{}, ingesterInflightRequests: 0, storeGatewayInflightRequests: 0, querierInflightRequestsTotal: 0, - querierInflightRequestsGauge: querierInflightRequests, + querierInflightRequestsMetric: querierInflightRequestsMetric, }, nil } @@ -135,6 +141,8 @@ func (qcl *QueryComponentUtilization) ExceedsThresholdForComponentName( } isIngester, isStoreGateway := queryComponentFlags(name) + qcl.inflightRequestsMu.RLock() + defer qcl.inflightRequestsMu.RUnlock() if isIngester { if connectedWorkers-(qcl.ingesterInflightRequests) <= minReservedConnections { return true, Ingester @@ -148,32 +156,62 @@ func (qcl *QueryComponentUtilization) ExceedsThresholdForComponentName( return false, "" } -// IncrementForComponentName is called when a request is sent to a querier -func (qcl *QueryComponentUtilization) IncrementForComponentName(expectedQueryComponent string) { +// MarkRequestSent is called when a request is sent to a querier +func (qcl *QueryComponentUtilization) MarkRequestSent(req *SchedulerRequest) { + if req != nil { + qcl.inflightRequestsMu.Lock() + defer qcl.inflightRequestsMu.Unlock() + + qcl.inflightRequests[req.Key()] = req + qcl.incrementForComponentName(req.ExpectedQueryComponentName()) + } +} + +// MarkRequestCompleted is called when a querier completes or fails a request +func (qcl *QueryComponentUtilization) MarkRequestCompleted(req *SchedulerRequest) { + if req != nil { + qcl.inflightRequestsMu.Lock() + defer qcl.inflightRequestsMu.Unlock() + + reqKey := req.Key() + if req, ok := qcl.inflightRequests[reqKey]; ok { + qcl.decrementForComponentName(req.ExpectedQueryComponentName()) + } + delete(qcl.inflightRequests, reqKey) + } +} + +func (qcl *QueryComponentUtilization) incrementForComponentName(expectedQueryComponent string) { qcl.updateForComponentName(expectedQueryComponent, 1) } -// DecrementForComponentName is called when a querier completes or fails a request -func (qcl *QueryComponentUtilization) DecrementForComponentName(expectedQueryComponent string) { +func (qcl *QueryComponentUtilization) decrementForComponentName(expectedQueryComponent string) { qcl.updateForComponentName(expectedQueryComponent, -1) } func (qcl *QueryComponentUtilization) updateForComponentName(expectedQueryComponent string, increment int) { isIngester, isStoreGateway := queryComponentFlags(expectedQueryComponent) - + // lock is expected to be obtained by the calling method to mark the request as sent or completed if isIngester { qcl.ingesterInflightRequests += increment - qcl.querierInflightRequestsGauge.WithLabelValues(string(Ingester)).Add(float64(increment)) } if isStoreGateway { qcl.storeGatewayInflightRequests += increment - qcl.querierInflightRequestsGauge.WithLabelValues(string(StoreGateway)).Add(float64(increment)) } qcl.querierInflightRequestsTotal += increment } +func (qcl *QueryComponentUtilization) ObserveInflightRequests() { + qcl.inflightRequestsMu.RLock() + defer qcl.inflightRequestsMu.RUnlock() + qcl.querierInflightRequestsMetric.WithLabelValues(string(Ingester)).Observe(float64(qcl.ingesterInflightRequests)) + qcl.querierInflightRequestsMetric.WithLabelValues(string(StoreGateway)).Observe(float64(qcl.storeGatewayInflightRequests)) +} + // GetForComponent is a test-only util func (qcl *QueryComponentUtilization) GetForComponent(component QueryComponent) int { + qcl.inflightRequestsMu.RLock() + defer qcl.inflightRequestsMu.RUnlock() switch component { case Ingester: return qcl.ingesterInflightRequests diff --git a/pkg/scheduler/queue/query_component_utilization_test.go b/pkg/scheduler/queue/query_component_utilization_test.go index d025ba9e4ff..44bfb2ec82f 100644 --- a/pkg/scheduler/queue/query_component_utilization_test.go +++ b/pkg/scheduler/queue/query_component_utilization_test.go @@ -4,17 +4,24 @@ package queue import ( "testing" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/stretchr/testify/require" ) -func testQuerierInflightRequestsGauge() *prometheus.GaugeVec { - return promauto.With(prometheus.NewPedanticRegistry()).NewGaugeVec(prometheus.GaugeOpts{ - Name: "test_query_scheduler_querier_inflight_requests", - Help: "[test] Number of inflight requests being processed on a querier-scheduler connection.", - }, []string{"query_component"}) +func testQuerierInflightRequestsGauge() *prometheus.SummaryVec { + return promauto.With(prometheus.NewPedanticRegistry()).NewSummaryVec( + prometheus.SummaryOpts{ + Name: "test_cortex_query_scheduler_querier_inflight_requests", + Help: "[test] Number of inflight requests being processed on all querier-scheduler connections. Quantile buckets keep track of inflight requests over the last 60s.", + Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, + MaxAge: time.Minute, + AgeBuckets: 6, + }, + []string{"query_component"}, + ) } func TestExceedsUtilizationThresholdForQueryComponents(t *testing.T) { @@ -216,11 +223,21 @@ func TestExceedsUtilizationThresholdForQueryComponents(t *testing.T) { require.NoError(t, err) for i := 0; i < testCase.ingesterInflightRequests; i++ { - queryComponentUtilization.IncrementForComponentName(ingesterQueueDimension) + ingesterInflightRequest := &SchedulerRequest{ + FrontendAddr: "frontend-a", + QueryID: uint64(i), + AdditionalQueueDimensions: []string{ingesterQueueDimension}, + } + queryComponentUtilization.MarkRequestSent(ingesterInflightRequest) } for i := 0; i < testCase.storeGatewayInflightRequests; i++ { - queryComponentUtilization.IncrementForComponentName(storeGatewayQueueDimension) + storeGatewayInflightRequest := &SchedulerRequest{ + FrontendAddr: "frontend-b", + QueryID: uint64(i), + AdditionalQueueDimensions: []string{storeGatewayQueueDimension}, + } + queryComponentUtilization.MarkRequestSent(storeGatewayInflightRequest) } exceedsThreshold, queryComponent := queryComponentUtilization.ExceedsThresholdForComponentName( diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index 7cf73afbf29..fc2576fca6b 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -133,9 +133,6 @@ type RequestQueue struct { stopRequested chan struct{} // Written to by stop() to wake up dispatcherLoop() in response to a stop request. stopCompleted chan struct{} // Closed by dispatcherLoop() after a stop is requested and the dispatcher has stopped. - // querierInflightRequests tracks requests from the time the request was successfully sent to a querier - // to the time the request was completed by the querier or failed due to cancel, timeout, or disconnect. - querierInflightRequests map[RequestKey]*SchedulerRequest requestsToEnqueue chan requestToEnqueue requestsSent chan *SchedulerRequest requestsCompleted chan *SchedulerRequest @@ -181,9 +178,9 @@ func NewRequestQueue( queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec, enqueueDuration prometheus.Histogram, - querierInflightRequestsGauge *prometheus.GaugeVec, + querierInflightRequestsMetric *prometheus.SummaryVec, ) (*RequestQueue, error) { - queryComponentCapacity, err := NewQueryComponentUtilization(DefaultReservedQueryComponentCapacity, querierInflightRequestsGauge) + queryComponentCapacity, err := NewQueryComponentUtilization(DefaultReservedQueryComponentCapacity, querierInflightRequestsMetric) if err != nil { return nil, err } @@ -206,7 +203,6 @@ func NewRequestQueue( stopCompleted: make(chan struct{}), requestsToEnqueue: make(chan requestToEnqueue), - querierInflightRequests: map[RequestKey]*SchedulerRequest{}, requestsSent: make(chan *SchedulerRequest), requestsCompleted: make(chan *SchedulerRequest), querierOperations: make(chan querierOperation), @@ -217,7 +213,7 @@ func NewRequestQueue( queueBroker: newQueueBroker(maxOutstandingPerTenant, additionalQueueDimensionsEnabled, forgetDelay), } - q.Service = services.NewTimerService(forgetCheckPeriod, q.starting, q.forgetDisconnectedQueriers, q.stop).WithName("request queue") + q.Service = services.NewBasicService(q.starting, q.running, q.stop).WithName("request queue") return q, nil } @@ -229,6 +225,29 @@ func (q *RequestQueue) starting(_ context.Context) error { return nil } +func (q *RequestQueue) running(ctx context.Context) error { + // periodically submit a message to dispatcherLoop to forget disconnected queriers + forgetDisconnectedQueriersTicker := time.NewTicker(forgetCheckPeriod) + defer forgetDisconnectedQueriersTicker.Stop() + + // periodically submit a message to dispatcherLoop to observe inflight requests; + // same as scheduler, we observe inflight requests frequently and at regular intervals + // to have a good approximation of max inflight requests over percentiles of time. + inflightRequestsTicker := time.NewTicker(250 * time.Millisecond) + defer inflightRequestsTicker.Stop() + + for { + select { + case <-forgetDisconnectedQueriersTicker.C: + q.submitForgetDisconnectedQueriers(ctx) + case <-inflightRequestsTicker.C: + q.QueryComponentUtilization.ObserveInflightRequests() + case <-ctx.Done(): + return nil + } + } +} + func (q *RequestQueue) dispatcherLoop() { stopping := false @@ -248,10 +267,6 @@ func (q *RequestQueue) dispatcherLoop() { if err == nil { needToDispatchQueries = true } - case sentReq := <-q.requestsSent: - q.processRequestSent(sentReq) - case completedReq := <-q.requestsCompleted: - q.processRequestCompleted(completedReq) case waitingConn := <-q.waitingQuerierConns: requestSent := q.trySendNextRequestForQuerier(waitingConn) if !requestSent { @@ -470,9 +485,8 @@ func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64 { return float64(q.connectedQuerierWorkers.Load()) } -func (q *RequestQueue) forgetDisconnectedQueriers(_ context.Context) error { +func (q *RequestQueue) submitForgetDisconnectedQueriers(_ context.Context) { q.submitQuerierOperation("", forgetDisconnected) - return nil } func (q *RequestQueue) SubmitRegisterQuerierConnection(querierID string) { @@ -534,41 +548,6 @@ func (q *RequestQueue) processForgetDisconnectedQueriers() (resharded bool) { return q.queueBroker.forgetDisconnectedQueriers(time.Now()) } -func (q *RequestQueue) SubmitRequestSent(req *SchedulerRequest) { - if req != nil { - select { - case q.requestsSent <- req: - case <-q.stopCompleted: - } - } -} - -func (q *RequestQueue) processRequestSent(req *SchedulerRequest) { - if req != nil { - q.querierInflightRequests[req.Key()] = req - q.QueryComponentUtilization.IncrementForComponentName(req.ExpectedQueryComponentName()) - } -} - -func (q *RequestQueue) SubmitRequestCompleted(req *SchedulerRequest) { - if req != nil { - select { - case q.requestsCompleted <- req: - case <-q.stopCompleted: - } - } -} - -func (q *RequestQueue) processRequestCompleted(req *SchedulerRequest) { - if req != nil { - reqKey := req.Key() - if req, ok := q.querierInflightRequests[reqKey]; ok { - q.QueryComponentUtilization.DecrementForComponentName(req.ExpectedQueryComponentName()) - } - delete(q.querierInflightRequests, reqKey) - } -} - // waitingQuerierConn is a "request" indicating that the querier is ready to receive the next query request. // It embeds the unbuffered `recvChan` to receive the requestForQuerier "response" from the RequestQueue. // The request/response terminology is avoided in naming to disambiguate with the actual query requests. diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 9664f98d9b2..d4e80093aa4 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -134,7 +134,7 @@ func TestMultiDimensionalQueueFairnessSlowConsumerEffects(t *testing.T) { promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), ) require.NoError(t, err) @@ -239,7 +239,7 @@ func BenchmarkConcurrentQueueOperations(b *testing.B) { promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), ) require.NoError(b, err) @@ -411,7 +411,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), ) require.NoError(t, err) @@ -480,7 +480,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ReshardNotifiedCorrectlyForMultip promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), ) require.NoError(t, err) @@ -564,7 +564,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnAfterContextCancelled promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), ) require.NoError(t, err) @@ -605,7 +605,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnImmediatelyIfQuerierI promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), ) require.NoError(t, err) @@ -634,7 +634,7 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), ) require.NoError(t, err) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index af36b51a5d2..3ef86917672 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -145,13 +145,17 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe Name: "cortex_query_scheduler_enqueue_duration_seconds", Help: "Time spent by requests waiting to join the queue or be rejected.", }) - querierInflightRequestsGauge := promauto.With(registerer).NewGaugeVec( - prometheus.GaugeOpts{ - Name: "cortex_query_scheduler_querier_inflight_requests", - Help: "Number of inflight requests being processed on all querier-scheduler connections.", + querierInflightRequestsMetric := promauto.With(registerer).NewSummaryVec( + prometheus.SummaryOpts{ + Name: "cortex_query_scheduler_querier_inflight_requests", + Help: "Number of inflight requests being processed on all querier-scheduler connections. Quantile buckets keep track of inflight requests over the last 60s.", + Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, + MaxAge: time.Minute, + AgeBuckets: 6, }, []string{"query_component"}, ) + s.requestQueue, err = queue.NewRequestQueue( s.log, cfg.MaxOutstandingPerTenant, @@ -160,7 +164,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe s.queueLength, s.discardedRequests, enqueueDuration, - querierInflightRequestsGauge, + querierInflightRequestsMetric, ) if err != nil { return nil, err @@ -278,7 +282,7 @@ func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_Front requestKey := queue.NewSchedulerRequestKey(frontendAddress, msg.QueryID) schedulerReq := s.cancelRequestAndRemoveFromPending(requestKey, "frontend cancelled query") // we may not have reached SubmitRequestSent for this query, but RequestQueue will handle this case - s.requestQueue.SubmitRequestCompleted(schedulerReq) + s.requestQueue.QueryComponentUtilization.MarkRequestSent(schedulerReq) resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} default: @@ -447,9 +451,9 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL */ if schedulerReq.Ctx.Err() != nil { - // remove from pending requests; - // no need to SubmitRequestCompleted to RequestQueue as we had not yet SubmitRequestSent + // remove from pending requests s.cancelRequestAndRemoveFromPending(schedulerReq.Key(), "request cancelled") + s.requestQueue.QueryComponentUtilization.MarkRequestCompleted(schedulerReq) lastUserIndex = lastUserIndex.ReuseLastTenant() continue } @@ -470,8 +474,8 @@ func (s *Scheduler) NotifyQuerierShutdown(_ context.Context, req *schedulerpb.No } func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer, req *queue.SchedulerRequest, queueTime time.Duration) error { - s.requestQueue.SubmitRequestSent(req) - defer s.requestQueue.SubmitRequestCompleted(req) + s.requestQueue.QueryComponentUtilization.MarkRequestSent(req) + defer s.requestQueue.QueryComponentUtilization.MarkRequestCompleted(req) defer s.cancelRequestAndRemoveFromPending(req.Key(), "request complete") // Handle the stream sending & receiving on a goroutine so we can