From 7be8c66fed9329883c35082a23b46e0a65143837 Mon Sep 17 00:00:00 2001 From: francoposa Date: Tue, 18 Jun 2024 13:21:40 -0700 Subject: [PATCH 1/9] query scheduler querier inflight requests convert to summary metric --- pkg/frontend/v1/frontend.go | 11 +++++++---- pkg/scheduler/queue/query_component_utilization.go | 11 ++++++----- pkg/scheduler/queue/queue.go | 4 ++-- pkg/scheduler/scheduler.go | 14 +++++++++----- 4 files changed, 24 insertions(+), 16 deletions(-) diff --git a/pkg/frontend/v1/frontend.go b/pkg/frontend/v1/frontend.go index 5270af11801..62e67b8e632 100644 --- a/pkg/frontend/v1/frontend.go +++ b/pkg/frontend/v1/frontend.go @@ -113,10 +113,13 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist Name: "cortex_query_frontend_enqueue_duration_seconds", Help: "Time spent by requests waiting to join the queue or be rejected.", }) - querierInflightRequests := promauto.With(registerer).NewGaugeVec( - prometheus.GaugeOpts{ - Name: "cortex_query_frontend_querier_inflight_requests", - Help: "Number of inflight requests being processed on all querier-scheduler connections.", + querierInflightRequests := promauto.With(registerer).NewSummaryVec( + prometheus.SummaryOpts{ + Name: "cortex_query_scheduler_querier_inflight_requests", + Help: "Number of inflight requests being processed on all querier-scheduler connections. . Quantile buckets keep track of inflight requests over the last 60s.", + Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, + MaxAge: time.Minute, + AgeBuckets: 6, }, []string{"query_component"}, ) diff --git a/pkg/scheduler/queue/query_component_utilization.go b/pkg/scheduler/queue/query_component_utilization.go index 9a6e9555781..fa96a7a00a4 100644 --- a/pkg/scheduler/queue/query_component_utilization.go +++ b/pkg/scheduler/queue/query_component_utilization.go @@ -56,7 +56,7 @@ type QueryComponentUtilization struct { storeGatewayInflightRequests int querierInflightRequestsTotal int - querierInflightRequestsGauge *prometheus.GaugeVec + querierInflightRequestsMetric *prometheus.SummaryVec } // DefaultReservedQueryComponentCapacity reserves 1 / 3 of querier-worker connections @@ -73,7 +73,7 @@ const MaxReservedQueryComponentCapacity = 0.5 func NewQueryComponentUtilization( targetReservedCapacity float64, - querierInflightRequests *prometheus.GaugeVec, + querierInflightRequestsMetric *prometheus.SummaryVec, ) (*QueryComponentUtilization, error) { if targetReservedCapacity >= MaxReservedQueryComponentCapacity { @@ -87,7 +87,7 @@ func NewQueryComponentUtilization( storeGatewayInflightRequests: 0, querierInflightRequestsTotal: 0, - querierInflightRequestsGauge: querierInflightRequests, + querierInflightRequestsMetric: querierInflightRequestsMetric, }, nil } @@ -163,12 +163,13 @@ func (qcl *QueryComponentUtilization) updateForComponentName(expectedQueryCompon if isIngester { qcl.ingesterInflightRequests += increment - qcl.querierInflightRequestsGauge.WithLabelValues(string(Ingester)).Add(float64(increment)) } if isStoreGateway { qcl.storeGatewayInflightRequests += increment - qcl.querierInflightRequestsGauge.WithLabelValues(string(StoreGateway)).Add(float64(increment)) } + + qcl.querierInflightRequestsMetric.WithLabelValues(string(Ingester)).Observe(float64(qcl.ingesterInflightRequests)) + qcl.querierInflightRequestsMetric.WithLabelValues(string(StoreGateway)).Observe(float64(qcl.storeGatewayInflightRequests)) qcl.querierInflightRequestsTotal += increment } diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index 7cf73afbf29..1ce0dc448ac 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -181,9 +181,9 @@ func NewRequestQueue( queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec, enqueueDuration prometheus.Histogram, - querierInflightRequestsGauge *prometheus.GaugeVec, + querierInflightRequestsMetric *prometheus.SummaryVec, ) (*RequestQueue, error) { - queryComponentCapacity, err := NewQueryComponentUtilization(DefaultReservedQueryComponentCapacity, querierInflightRequestsGauge) + queryComponentCapacity, err := NewQueryComponentUtilization(DefaultReservedQueryComponentCapacity, querierInflightRequestsMetric) if err != nil { return nil, err } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index af36b51a5d2..db0b71a012a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -145,13 +145,17 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe Name: "cortex_query_scheduler_enqueue_duration_seconds", Help: "Time spent by requests waiting to join the queue or be rejected.", }) - querierInflightRequestsGauge := promauto.With(registerer).NewGaugeVec( - prometheus.GaugeOpts{ - Name: "cortex_query_scheduler_querier_inflight_requests", - Help: "Number of inflight requests being processed on all querier-scheduler connections.", + querierInflightRequestsMetric := promauto.With(registerer).NewSummaryVec( + prometheus.SummaryOpts{ + Name: "cortex_query_scheduler_querier_inflight_requests", + Help: "Number of inflight requests being processed on all querier-scheduler connections. . Quantile buckets keep track of inflight requests over the last 60s.", + Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, + MaxAge: time.Minute, + AgeBuckets: 6, }, []string{"query_component"}, ) + s.requestQueue, err = queue.NewRequestQueue( s.log, cfg.MaxOutstandingPerTenant, @@ -160,7 +164,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe s.queueLength, s.discardedRequests, enqueueDuration, - querierInflightRequestsGauge, + querierInflightRequestsMetric, ) if err != nil { return nil, err From 585dd996a695affc2b2beb7b679d2e9da847848a Mon Sep 17 00:00:00 2001 From: francoposa Date: Tue, 18 Jun 2024 13:46:09 -0700 Subject: [PATCH 2/9] fix test --- pkg/scheduler/queue/queue_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 9664f98d9b2..d4e80093aa4 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -134,7 +134,7 @@ func TestMultiDimensionalQueueFairnessSlowConsumerEffects(t *testing.T) { promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), ) require.NoError(t, err) @@ -239,7 +239,7 @@ func BenchmarkConcurrentQueueOperations(b *testing.B) { promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), ) require.NoError(b, err) @@ -411,7 +411,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), ) require.NoError(t, err) @@ -480,7 +480,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ReshardNotifiedCorrectlyForMultip promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), ) require.NoError(t, err) @@ -564,7 +564,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnAfterContextCancelled promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), ) require.NoError(t, err) @@ -605,7 +605,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnImmediatelyIfQuerierI promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), ) require.NoError(t, err) @@ -634,7 +634,7 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}), - promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewSummaryVec(prometheus.SummaryOpts{}, []string{"query_component"}), ) require.NoError(t, err) From c6040161767b349e91dd2413353ac15d00577e68 Mon Sep 17 00:00:00 2001 From: francoposa Date: Tue, 18 Jun 2024 13:51:00 -0700 Subject: [PATCH 3/9] fix other test --- .../queue/query_component_utilization_test.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/pkg/scheduler/queue/query_component_utilization_test.go b/pkg/scheduler/queue/query_component_utilization_test.go index d025ba9e4ff..7d0e7d554ed 100644 --- a/pkg/scheduler/queue/query_component_utilization_test.go +++ b/pkg/scheduler/queue/query_component_utilization_test.go @@ -4,17 +4,24 @@ package queue import ( "testing" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/stretchr/testify/require" ) -func testQuerierInflightRequestsGauge() *prometheus.GaugeVec { - return promauto.With(prometheus.NewPedanticRegistry()).NewGaugeVec(prometheus.GaugeOpts{ - Name: "test_query_scheduler_querier_inflight_requests", - Help: "[test] Number of inflight requests being processed on a querier-scheduler connection.", - }, []string{"query_component"}) +func testQuerierInflightRequestsGauge() *prometheus.SummaryVec { + return promauto.With(prometheus.NewPedanticRegistry()).NewSummaryVec( + prometheus.SummaryOpts{ + Name: "test_cortex_query_scheduler_querier_inflight_requests", + Help: "[test] Number of inflight requests being processed on all querier-scheduler connections. . Quantile buckets keep track of inflight requests over the last 60s.", + Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, + MaxAge: time.Minute, + AgeBuckets: 6, + }, + []string{"query_component"}, + ) } func TestExceedsUtilizationThresholdForQueryComponents(t *testing.T) { From 6bb39f39d892758837dc69dada018bef0cee987b Mon Sep 17 00:00:00 2001 From: francoposa Date: Tue, 18 Jun 2024 15:11:23 -0700 Subject: [PATCH 4/9] fix bad copy-paste --- pkg/frontend/v1/frontend.go | 4 ++-- pkg/scheduler/queue/query_component_utilization_test.go | 2 +- pkg/scheduler/scheduler.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/frontend/v1/frontend.go b/pkg/frontend/v1/frontend.go index 62e67b8e632..a804b95057c 100644 --- a/pkg/frontend/v1/frontend.go +++ b/pkg/frontend/v1/frontend.go @@ -115,8 +115,8 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist }) querierInflightRequests := promauto.With(registerer).NewSummaryVec( prometheus.SummaryOpts{ - Name: "cortex_query_scheduler_querier_inflight_requests", - Help: "Number of inflight requests being processed on all querier-scheduler connections. . Quantile buckets keep track of inflight requests over the last 60s.", + Name: "cortex_query_frontend_querier_inflight_requests", + Help: "Number of inflight requests being processed on all querier-scheduler connections. Quantile buckets keep track of inflight requests over the last 60s.", Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, MaxAge: time.Minute, AgeBuckets: 6, diff --git a/pkg/scheduler/queue/query_component_utilization_test.go b/pkg/scheduler/queue/query_component_utilization_test.go index 7d0e7d554ed..4202990164b 100644 --- a/pkg/scheduler/queue/query_component_utilization_test.go +++ b/pkg/scheduler/queue/query_component_utilization_test.go @@ -15,7 +15,7 @@ func testQuerierInflightRequestsGauge() *prometheus.SummaryVec { return promauto.With(prometheus.NewPedanticRegistry()).NewSummaryVec( prometheus.SummaryOpts{ Name: "test_cortex_query_scheduler_querier_inflight_requests", - Help: "[test] Number of inflight requests being processed on all querier-scheduler connections. . Quantile buckets keep track of inflight requests over the last 60s.", + Help: "[test] Number of inflight requests being processed on all querier-scheduler connections. Quantile buckets keep track of inflight requests over the last 60s.", Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, MaxAge: time.Minute, AgeBuckets: 6, diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index db0b71a012a..23c3d1aa603 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -148,7 +148,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe querierInflightRequestsMetric := promauto.With(registerer).NewSummaryVec( prometheus.SummaryOpts{ Name: "cortex_query_scheduler_querier_inflight_requests", - Help: "Number of inflight requests being processed on all querier-scheduler connections. . Quantile buckets keep track of inflight requests over the last 60s.", + Help: "Number of inflight requests being processed on all querier-scheduler connections. Quantile buckets keep track of inflight requests over the last 60s.", Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, MaxAge: time.Minute, AgeBuckets: 6, From 27611b397fea2d399aa5d67fb8a6e4f28e4f10f3 Mon Sep 17 00:00:00 2001 From: francoposa Date: Thu, 20 Jun 2024 21:22:52 -0700 Subject: [PATCH 5/9] WIP: request queu observe inflight requests on ticker like scheduler --- .../queue/query_component_utilization.go | 7 ++- pkg/scheduler/queue/queue.go | 45 +++++++++++++++++-- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/pkg/scheduler/queue/query_component_utilization.go b/pkg/scheduler/queue/query_component_utilization.go index fa96a7a00a4..758fc1fb17c 100644 --- a/pkg/scheduler/queue/query_component_utilization.go +++ b/pkg/scheduler/queue/query_component_utilization.go @@ -168,9 +168,14 @@ func (qcl *QueryComponentUtilization) updateForComponentName(expectedQueryCompon qcl.storeGatewayInflightRequests += increment } + //qcl.querierInflightRequestsMetric.WithLabelValues(string(Ingester)).Observe(float64(qcl.ingesterInflightRequests)) + //qcl.querierInflightRequestsMetric.WithLabelValues(string(StoreGateway)).Observe(float64(qcl.storeGatewayInflightRequests)) + qcl.querierInflightRequestsTotal += increment +} + +func (qcl *QueryComponentUtilization) ObserveInflightRequests() { qcl.querierInflightRequestsMetric.WithLabelValues(string(Ingester)).Observe(float64(qcl.ingesterInflightRequests)) qcl.querierInflightRequestsMetric.WithLabelValues(string(StoreGateway)).Observe(float64(qcl.storeGatewayInflightRequests)) - qcl.querierInflightRequestsTotal += increment } // GetForComponent is a test-only util diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index 1ce0dc448ac..1af248407b4 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -133,6 +133,8 @@ type RequestQueue struct { stopRequested chan struct{} // Written to by stop() to wake up dispatcherLoop() in response to a stop request. stopCompleted chan struct{} // Closed by dispatcherLoop() after a stop is requested and the dispatcher has stopped. + observeInflightRequests chan struct{} + // querierInflightRequests tracks requests from the time the request was successfully sent to a querier // to the time the request was completed by the querier or failed due to cancel, timeout, or disconnect. querierInflightRequests map[RequestKey]*SchedulerRequest @@ -205,6 +207,8 @@ func NewRequestQueue( stopRequested: make(chan struct{}), stopCompleted: make(chan struct{}), + observeInflightRequests: make(chan struct{}), + requestsToEnqueue: make(chan requestToEnqueue), querierInflightRequests: map[RequestKey]*SchedulerRequest{}, requestsSent: make(chan *SchedulerRequest), @@ -217,7 +221,7 @@ func NewRequestQueue( queueBroker: newQueueBroker(maxOutstandingPerTenant, additionalQueueDimensionsEnabled, forgetDelay), } - q.Service = services.NewTimerService(forgetCheckPeriod, q.starting, q.forgetDisconnectedQueriers, q.stop).WithName("request queue") + q.Service = services.NewBasicService(q.starting, q.running, q.stop).WithName("request queue") return q, nil } @@ -229,6 +233,29 @@ func (q *RequestQueue) starting(_ context.Context) error { return nil } +func (q *RequestQueue) running(ctx context.Context) error { + // periodically submit a message to dispatcherLoop to forget disconnected queriers + forgetDisconnectedQueriersTicker := time.NewTicker(forgetCheckPeriod) + defer forgetDisconnectedQueriersTicker.Stop() + + // periodically submit a message to dispatcherLoop to observe inflight requests; + // same as scheduler, we observe inflight requests frequently and at regular intervals + // to have a good approximation of max inflight requests over percentiles of time. + inflightRequestsTicker := time.NewTicker(250 * time.Millisecond) + defer inflightRequestsTicker.Stop() + + for { + select { + case <-forgetDisconnectedQueriersTicker.C: + q.submitForgetDisconnectedQueriers(ctx) + case <-inflightRequestsTicker.C: + q.submitObserveInflightRequests() + case <-ctx.Done(): + return nil + } + } +} + func (q *RequestQueue) dispatcherLoop() { stopping := false @@ -239,6 +266,8 @@ func (q *RequestQueue) dispatcherLoop() { case <-q.stopRequested: // Nothing much to do here - fall through to the stop logic below to see if we can stop immediately. stopping = true + case <-q.observeInflightRequests: + q.processObserveInflightRequests() case querierOp := <-q.querierOperations: // Need to attempt to dispatch queries only if querier operation results in a resharding needToDispatchQueries = q.processQuerierOperation(querierOp) @@ -470,9 +499,8 @@ func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64 { return float64(q.connectedQuerierWorkers.Load()) } -func (q *RequestQueue) forgetDisconnectedQueriers(_ context.Context) error { +func (q *RequestQueue) submitForgetDisconnectedQueriers(_ context.Context) { q.submitQuerierOperation("", forgetDisconnected) - return nil } func (q *RequestQueue) SubmitRegisterQuerierConnection(querierID string) { @@ -534,6 +562,17 @@ func (q *RequestQueue) processForgetDisconnectedQueriers() (resharded bool) { return q.queueBroker.forgetDisconnectedQueriers(time.Now()) } +func (q *RequestQueue) submitObserveInflightRequests() { + select { + case q.observeInflightRequests <- struct{}{}: + case <-q.stopCompleted: + } +} + +func (q *RequestQueue) processObserveInflightRequests() { + q.QueryComponentUtilization.ObserveInflightRequests() +} + func (q *RequestQueue) SubmitRequestSent(req *SchedulerRequest) { if req != nil { select { From c485ee0e8046e5b0de77db4b26eb2a60ea2780bb Mon Sep 17 00:00:00 2001 From: francoposa Date: Fri, 21 Jun 2024 12:39:15 -0700 Subject: [PATCH 6/9] rm commented lines --- pkg/scheduler/queue/query_component_utilization.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/scheduler/queue/query_component_utilization.go b/pkg/scheduler/queue/query_component_utilization.go index 758fc1fb17c..ec3a2ff2887 100644 --- a/pkg/scheduler/queue/query_component_utilization.go +++ b/pkg/scheduler/queue/query_component_utilization.go @@ -167,9 +167,6 @@ func (qcl *QueryComponentUtilization) updateForComponentName(expectedQueryCompon if isStoreGateway { qcl.storeGatewayInflightRequests += increment } - - //qcl.querierInflightRequestsMetric.WithLabelValues(string(Ingester)).Observe(float64(qcl.ingesterInflightRequests)) - //qcl.querierInflightRequestsMetric.WithLabelValues(string(StoreGateway)).Observe(float64(qcl.storeGatewayInflightRequests)) qcl.querierInflightRequestsTotal += increment } From ebff77fe1ab648146d801599416894ab74513abd Mon Sep 17 00:00:00 2001 From: francoposa Date: Tue, 25 Jun 2024 12:50:46 -0700 Subject: [PATCH 7/9] convert querier-scheduler inflight requests observation to use atomics --- .../queue/query_component_utilization.go | 10 ++++++++++ pkg/scheduler/queue/queue.go | 15 +-------------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/pkg/scheduler/queue/query_component_utilization.go b/pkg/scheduler/queue/query_component_utilization.go index ec3a2ff2887..9c96311e883 100644 --- a/pkg/scheduler/queue/query_component_utilization.go +++ b/pkg/scheduler/queue/query_component_utilization.go @@ -4,6 +4,7 @@ package queue import ( "math" + "sync" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -52,6 +53,7 @@ type QueryComponentUtilization struct { // for queries to the less-loaded query component when the query queue becomes backlogged. targetReservedCapacity float64 + inflightRequestsMu sync.RWMutex ingesterInflightRequests int storeGatewayInflightRequests int querierInflightRequestsTotal int @@ -135,6 +137,8 @@ func (qcl *QueryComponentUtilization) ExceedsThresholdForComponentName( } isIngester, isStoreGateway := queryComponentFlags(name) + qcl.inflightRequestsMu.RLock() + defer qcl.inflightRequestsMu.RUnlock() if isIngester { if connectedWorkers-(qcl.ingesterInflightRequests) <= minReservedConnections { return true, Ingester @@ -161,6 +165,8 @@ func (qcl *QueryComponentUtilization) DecrementForComponentName(expectedQueryCom func (qcl *QueryComponentUtilization) updateForComponentName(expectedQueryComponent string, increment int) { isIngester, isStoreGateway := queryComponentFlags(expectedQueryComponent) + qcl.inflightRequestsMu.Lock() + defer qcl.inflightRequestsMu.Unlock() if isIngester { qcl.ingesterInflightRequests += increment } @@ -171,12 +177,16 @@ func (qcl *QueryComponentUtilization) updateForComponentName(expectedQueryCompon } func (qcl *QueryComponentUtilization) ObserveInflightRequests() { + qcl.inflightRequestsMu.RLock() + defer qcl.inflightRequestsMu.RUnlock() qcl.querierInflightRequestsMetric.WithLabelValues(string(Ingester)).Observe(float64(qcl.ingesterInflightRequests)) qcl.querierInflightRequestsMetric.WithLabelValues(string(StoreGateway)).Observe(float64(qcl.storeGatewayInflightRequests)) } // GetForComponent is a test-only util func (qcl *QueryComponentUtilization) GetForComponent(component QueryComponent) int { + qcl.inflightRequestsMu.RLock() + defer qcl.inflightRequestsMu.RUnlock() switch component { case Ingester: return qcl.ingesterInflightRequests diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index 1af248407b4..f1c752e7595 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -249,7 +249,7 @@ func (q *RequestQueue) running(ctx context.Context) error { case <-forgetDisconnectedQueriersTicker.C: q.submitForgetDisconnectedQueriers(ctx) case <-inflightRequestsTicker.C: - q.submitObserveInflightRequests() + q.QueryComponentUtilization.ObserveInflightRequests() case <-ctx.Done(): return nil } @@ -266,8 +266,6 @@ func (q *RequestQueue) dispatcherLoop() { case <-q.stopRequested: // Nothing much to do here - fall through to the stop logic below to see if we can stop immediately. stopping = true - case <-q.observeInflightRequests: - q.processObserveInflightRequests() case querierOp := <-q.querierOperations: // Need to attempt to dispatch queries only if querier operation results in a resharding needToDispatchQueries = q.processQuerierOperation(querierOp) @@ -562,17 +560,6 @@ func (q *RequestQueue) processForgetDisconnectedQueriers() (resharded bool) { return q.queueBroker.forgetDisconnectedQueriers(time.Now()) } -func (q *RequestQueue) submitObserveInflightRequests() { - select { - case q.observeInflightRequests <- struct{}{}: - case <-q.stopCompleted: - } -} - -func (q *RequestQueue) processObserveInflightRequests() { - q.QueryComponentUtilization.ObserveInflightRequests() -} - func (q *RequestQueue) SubmitRequestSent(req *SchedulerRequest) { if req != nil { select { From 23598077c17fef1ec78819f3f1c38502268e3fc8 Mon Sep 17 00:00:00 2001 From: francoposa Date: Tue, 25 Jun 2024 13:36:28 -0700 Subject: [PATCH 8/9] convert all querier-scheduler inflight request tracking to utilize atomics --- .../queue/query_component_utilization.go | 41 ++++++++++++++---- .../queue/query_component_utilization_test.go | 14 +++++- pkg/scheduler/queue/queue.go | 43 ------------------- pkg/scheduler/scheduler.go | 10 ++--- 4 files changed, 50 insertions(+), 58 deletions(-) diff --git a/pkg/scheduler/queue/query_component_utilization.go b/pkg/scheduler/queue/query_component_utilization.go index 9c96311e883..ad09fd3e2df 100644 --- a/pkg/scheduler/queue/query_component_utilization.go +++ b/pkg/scheduler/queue/query_component_utilization.go @@ -53,7 +53,10 @@ type QueryComponentUtilization struct { // for queries to the less-loaded query component when the query queue becomes backlogged. targetReservedCapacity float64 - inflightRequestsMu sync.RWMutex + inflightRequestsMu sync.RWMutex + // inflightRequests tracks requests from the time the request was successfully sent to a querier + // to the time the request was completed by the querier or failed due to cancel, timeout, or disconnect. + inflightRequests map[RequestKey]*SchedulerRequest ingesterInflightRequests int storeGatewayInflightRequests int querierInflightRequestsTotal int @@ -85,6 +88,7 @@ func NewQueryComponentUtilization( return &QueryComponentUtilization{ targetReservedCapacity: targetReservedCapacity, + inflightRequests: map[RequestKey]*SchedulerRequest{}, ingesterInflightRequests: 0, storeGatewayInflightRequests: 0, querierInflightRequestsTotal: 0, @@ -152,21 +156,42 @@ func (qcl *QueryComponentUtilization) ExceedsThresholdForComponentName( return false, "" } -// IncrementForComponentName is called when a request is sent to a querier -func (qcl *QueryComponentUtilization) IncrementForComponentName(expectedQueryComponent string) { +func (qcl *QueryComponentUtilization) MarkRequestSent(req *SchedulerRequest) { + if req != nil { + qcl.inflightRequestsMu.Lock() + defer qcl.inflightRequestsMu.Unlock() + + qcl.inflightRequests[req.Key()] = req + qcl.incrementForComponentName(req.ExpectedQueryComponentName()) + } +} + +func (qcl *QueryComponentUtilization) MarkRequestCompleted(req *SchedulerRequest) { + if req != nil { + qcl.inflightRequestsMu.Lock() + defer qcl.inflightRequestsMu.Unlock() + + reqKey := req.Key() + if req, ok := qcl.inflightRequests[reqKey]; ok { + qcl.decrementForComponentName(req.ExpectedQueryComponentName()) + } + delete(qcl.inflightRequests, reqKey) + } +} + +// incrementForComponentName is called when a request is sent to a querier +func (qcl *QueryComponentUtilization) incrementForComponentName(expectedQueryComponent string) { qcl.updateForComponentName(expectedQueryComponent, 1) } -// DecrementForComponentName is called when a querier completes or fails a request -func (qcl *QueryComponentUtilization) DecrementForComponentName(expectedQueryComponent string) { +// decrementForComponentName is called when a querier completes or fails a request +func (qcl *QueryComponentUtilization) decrementForComponentName(expectedQueryComponent string) { qcl.updateForComponentName(expectedQueryComponent, -1) } func (qcl *QueryComponentUtilization) updateForComponentName(expectedQueryComponent string, increment int) { isIngester, isStoreGateway := queryComponentFlags(expectedQueryComponent) - - qcl.inflightRequestsMu.Lock() - defer qcl.inflightRequestsMu.Unlock() + // lock is expected to be obtained by the calling method to mark the request as sent or completed if isIngester { qcl.ingesterInflightRequests += increment } diff --git a/pkg/scheduler/queue/query_component_utilization_test.go b/pkg/scheduler/queue/query_component_utilization_test.go index 4202990164b..44bfb2ec82f 100644 --- a/pkg/scheduler/queue/query_component_utilization_test.go +++ b/pkg/scheduler/queue/query_component_utilization_test.go @@ -223,11 +223,21 @@ func TestExceedsUtilizationThresholdForQueryComponents(t *testing.T) { require.NoError(t, err) for i := 0; i < testCase.ingesterInflightRequests; i++ { - queryComponentUtilization.IncrementForComponentName(ingesterQueueDimension) + ingesterInflightRequest := &SchedulerRequest{ + FrontendAddr: "frontend-a", + QueryID: uint64(i), + AdditionalQueueDimensions: []string{ingesterQueueDimension}, + } + queryComponentUtilization.MarkRequestSent(ingesterInflightRequest) } for i := 0; i < testCase.storeGatewayInflightRequests; i++ { - queryComponentUtilization.IncrementForComponentName(storeGatewayQueueDimension) + storeGatewayInflightRequest := &SchedulerRequest{ + FrontendAddr: "frontend-b", + QueryID: uint64(i), + AdditionalQueueDimensions: []string{storeGatewayQueueDimension}, + } + queryComponentUtilization.MarkRequestSent(storeGatewayInflightRequest) } exceedsThreshold, queryComponent := queryComponentUtilization.ExceedsThresholdForComponentName( diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index f1c752e7595..3951b537c9b 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -135,9 +135,6 @@ type RequestQueue struct { observeInflightRequests chan struct{} - // querierInflightRequests tracks requests from the time the request was successfully sent to a querier - // to the time the request was completed by the querier or failed due to cancel, timeout, or disconnect. - querierInflightRequests map[RequestKey]*SchedulerRequest requestsToEnqueue chan requestToEnqueue requestsSent chan *SchedulerRequest requestsCompleted chan *SchedulerRequest @@ -210,7 +207,6 @@ func NewRequestQueue( observeInflightRequests: make(chan struct{}), requestsToEnqueue: make(chan requestToEnqueue), - querierInflightRequests: map[RequestKey]*SchedulerRequest{}, requestsSent: make(chan *SchedulerRequest), requestsCompleted: make(chan *SchedulerRequest), querierOperations: make(chan querierOperation), @@ -275,10 +271,6 @@ func (q *RequestQueue) dispatcherLoop() { if err == nil { needToDispatchQueries = true } - case sentReq := <-q.requestsSent: - q.processRequestSent(sentReq) - case completedReq := <-q.requestsCompleted: - q.processRequestCompleted(completedReq) case waitingConn := <-q.waitingQuerierConns: requestSent := q.trySendNextRequestForQuerier(waitingConn) if !requestSent { @@ -560,41 +552,6 @@ func (q *RequestQueue) processForgetDisconnectedQueriers() (resharded bool) { return q.queueBroker.forgetDisconnectedQueriers(time.Now()) } -func (q *RequestQueue) SubmitRequestSent(req *SchedulerRequest) { - if req != nil { - select { - case q.requestsSent <- req: - case <-q.stopCompleted: - } - } -} - -func (q *RequestQueue) processRequestSent(req *SchedulerRequest) { - if req != nil { - q.querierInflightRequests[req.Key()] = req - q.QueryComponentUtilization.IncrementForComponentName(req.ExpectedQueryComponentName()) - } -} - -func (q *RequestQueue) SubmitRequestCompleted(req *SchedulerRequest) { - if req != nil { - select { - case q.requestsCompleted <- req: - case <-q.stopCompleted: - } - } -} - -func (q *RequestQueue) processRequestCompleted(req *SchedulerRequest) { - if req != nil { - reqKey := req.Key() - if req, ok := q.querierInflightRequests[reqKey]; ok { - q.QueryComponentUtilization.DecrementForComponentName(req.ExpectedQueryComponentName()) - } - delete(q.querierInflightRequests, reqKey) - } -} - // waitingQuerierConn is a "request" indicating that the querier is ready to receive the next query request. // It embeds the unbuffered `recvChan` to receive the requestForQuerier "response" from the RequestQueue. // The request/response terminology is avoided in naming to disambiguate with the actual query requests. diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 23c3d1aa603..3ef86917672 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -282,7 +282,7 @@ func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_Front requestKey := queue.NewSchedulerRequestKey(frontendAddress, msg.QueryID) schedulerReq := s.cancelRequestAndRemoveFromPending(requestKey, "frontend cancelled query") // we may not have reached SubmitRequestSent for this query, but RequestQueue will handle this case - s.requestQueue.SubmitRequestCompleted(schedulerReq) + s.requestQueue.QueryComponentUtilization.MarkRequestSent(schedulerReq) resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} default: @@ -451,9 +451,9 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL */ if schedulerReq.Ctx.Err() != nil { - // remove from pending requests; - // no need to SubmitRequestCompleted to RequestQueue as we had not yet SubmitRequestSent + // remove from pending requests s.cancelRequestAndRemoveFromPending(schedulerReq.Key(), "request cancelled") + s.requestQueue.QueryComponentUtilization.MarkRequestCompleted(schedulerReq) lastUserIndex = lastUserIndex.ReuseLastTenant() continue } @@ -474,8 +474,8 @@ func (s *Scheduler) NotifyQuerierShutdown(_ context.Context, req *schedulerpb.No } func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer, req *queue.SchedulerRequest, queueTime time.Duration) error { - s.requestQueue.SubmitRequestSent(req) - defer s.requestQueue.SubmitRequestCompleted(req) + s.requestQueue.QueryComponentUtilization.MarkRequestSent(req) + defer s.requestQueue.QueryComponentUtilization.MarkRequestCompleted(req) defer s.cancelRequestAndRemoveFromPending(req.Key(), "request complete") // Handle the stream sending & receiving on a goroutine so we can From 4f1e32b0fc37a3b6f82f547492d43ca327d25a94 Mon Sep 17 00:00:00 2001 From: francoposa Date: Tue, 25 Jun 2024 13:53:27 -0700 Subject: [PATCH 9/9] cleanup --- pkg/scheduler/queue/query_component_utilization.go | 4 ++-- pkg/scheduler/queue/queue.go | 4 ---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/scheduler/queue/query_component_utilization.go b/pkg/scheduler/queue/query_component_utilization.go index ad09fd3e2df..320243c1878 100644 --- a/pkg/scheduler/queue/query_component_utilization.go +++ b/pkg/scheduler/queue/query_component_utilization.go @@ -156,6 +156,7 @@ func (qcl *QueryComponentUtilization) ExceedsThresholdForComponentName( return false, "" } +// MarkRequestSent is called when a request is sent to a querier func (qcl *QueryComponentUtilization) MarkRequestSent(req *SchedulerRequest) { if req != nil { qcl.inflightRequestsMu.Lock() @@ -166,6 +167,7 @@ func (qcl *QueryComponentUtilization) MarkRequestSent(req *SchedulerRequest) { } } +// MarkRequestCompleted is called when a querier completes or fails a request func (qcl *QueryComponentUtilization) MarkRequestCompleted(req *SchedulerRequest) { if req != nil { qcl.inflightRequestsMu.Lock() @@ -179,12 +181,10 @@ func (qcl *QueryComponentUtilization) MarkRequestCompleted(req *SchedulerRequest } } -// incrementForComponentName is called when a request is sent to a querier func (qcl *QueryComponentUtilization) incrementForComponentName(expectedQueryComponent string) { qcl.updateForComponentName(expectedQueryComponent, 1) } -// decrementForComponentName is called when a querier completes or fails a request func (qcl *QueryComponentUtilization) decrementForComponentName(expectedQueryComponent string) { qcl.updateForComponentName(expectedQueryComponent, -1) } diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index 3951b537c9b..fc2576fca6b 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -133,8 +133,6 @@ type RequestQueue struct { stopRequested chan struct{} // Written to by stop() to wake up dispatcherLoop() in response to a stop request. stopCompleted chan struct{} // Closed by dispatcherLoop() after a stop is requested and the dispatcher has stopped. - observeInflightRequests chan struct{} - requestsToEnqueue chan requestToEnqueue requestsSent chan *SchedulerRequest requestsCompleted chan *SchedulerRequest @@ -204,8 +202,6 @@ func NewRequestQueue( stopRequested: make(chan struct{}), stopCompleted: make(chan struct{}), - observeInflightRequests: make(chan struct{}), - requestsToEnqueue: make(chan requestToEnqueue), requestsSent: make(chan *SchedulerRequest), requestsCompleted: make(chan *SchedulerRequest),