diff --git a/CHANGELOG.md b/CHANGELOG.md index 460dbf16f7b..05191221332 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,7 +56,7 @@ * [EHNAHCEMENT] Distributor: add `insight=true` to remote-write and OTLP write handlers when the HTTP response status code is 4xx. #8294 * [ENHANCEMENT] Ingester: reduce locked time while matching postings for a label, improving the write latency and compaction speed. #8327 * [ENHANCEMENT] Ingester: reduce the amount of locks taken during the Head compaction's garbage-collection process, improving the write latency and compaction speed. #8327 -* [ENHANCEMENT] Query-scheduler: Introduce `prioritizeQueryComponents`, which allows configuration of the query-scheduler tree to prioritize dequeuing from a specific query component more highly than dequeueing from a specific tenant. #7873 +* [ENHANCEMENT] Query-scheduler: Introduce `query-frontend.prioritize-query-components` and `query-scheduler.prioritize-query-components`, which allow configuration of the request queue tree to prioritize dequeuing from a specific query component more highly than dequeueing from a specific tenant. #7873 * [BUGFIX] Distributor: prometheus retry on 5xx and 429 errors, while otlp collector only retry on 429, 502, 503 and 504, mapping other 5xx errors to the retryable ones in otlp endpoint. #8324 * [BUGFIX] Distributor: make OTLP endpoint return marshalled proto bytes as response body for 4xx/5xx errors. #8227 * [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567 diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 737f8e308ae..82591a1f4db 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -9,7 +9,6 @@ import ( "context" "errors" "fmt" - promtest "github.com/prometheus/client_golang/prometheus/testutil" "math/rand" "strconv" "strings" @@ -22,6 +21,7 @@ import ( "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" diff --git a/pkg/scheduler/queue/tenant_querier_assignment.go b/pkg/scheduler/queue/tenant_querier_assignment.go index 42743e9f8e8..d2293ee7243 100644 --- a/pkg/scheduler/queue/tenant_querier_assignment.go +++ b/pkg/scheduler/queue/tenant_querier_assignment.go @@ -96,14 +96,6 @@ type tenantQuerierAssignments struct { currentQuerier *QuerierID } -func (tqa *tenantQuerierAssignments) getTenant(tenantID TenantID) (*queueTenant, error) { - if tenantID == emptyTenantID { - return nil, ErrInvalidTenantID - } - tenant := tqa.tenantsByID[tenantID] - return tenant, nil -} - // createOrUpdateTenant creates or updates a tenant into the tenant-querier assignment state. // // New tenants are added to the tenant order list and tenant-querier shards are shuffled if needed. diff --git a/pkg/scheduler/queue/tenant_queues_test.go b/pkg/scheduler/queue/tenant_queues_test.go index 858e7463f2a..a7a33ed4786 100644 --- a/pkg/scheduler/queue/tenant_queues_test.go +++ b/pkg/scheduler/queue/tenant_queues_test.go @@ -8,13 +8,14 @@ package queue import ( "context" "fmt" - "github.com/grafana/dskit/httpgrpc" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "math" "math/rand" "testing" "time" + + "github.com/grafana/dskit/httpgrpc" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func (qb *queueBroker) enqueueObjectsForTests(tenantID TenantID, numObjects int) error { @@ -111,7 +112,7 @@ func TestQueues(t *testing.T) { {buildExpectedObject(tenantTwo.tenantID, 2), tenantTwo}, {buildExpectedObject(tenantOne.tenantID, 5), tenantOne}, } - lastTenantIndex = assertExpectedValuesOnDequeue(t, qb, -1, "querier-2", expectedDequeueVals) + _ = assertExpectedValuesOnDequeue(t, qb, -1, "querier-2", expectedDequeueVals) //[one two three] // confirm fifo by adding a third tenant queue and iterating to it @@ -310,7 +311,7 @@ func TestQueuesOnTerminatingQuerier(t *testing.T) { // After disconnecting querier-2, it's expected to own no queue. qb.tenantQuerierAssignments.removeQuerier("querier-2") - req, tenant, qTwolastTenantIndex, err = qb.dequeueRequestForQuerier(qTwolastTenantIndex, "querier-2") + req, tenant, _, _ = qb.dequeueRequestForQuerier(qTwolastTenantIndex, "querier-2") assert.Nil(t, req) assert.Nil(t, tenant) assert.Equal(t, ErrQuerierShuttingDown, err) diff --git a/pkg/scheduler/queue/tree_queue.go b/pkg/scheduler/queue/tree_queue.go index fa8f067f185..ddeb39e154d 100644 --- a/pkg/scheduler/queue/tree_queue.go +++ b/pkg/scheduler/queue/tree_queue.go @@ -118,9 +118,7 @@ func newNode(name string, depth int, da DequeueAlgorithm) (*Node, error) { if da == nil { return nil, fmt.Errorf("cannot create a node without a defined DequeueAlgorithm") } - switch da.(type) { - case *tenantQuerierAssignments: - tqa := da.(*tenantQuerierAssignments) + if tqa, ok := da.(*tenantQuerierAssignments); ok { tqa.tenantOrderIndex = localQueueIndex - 1 // start from -2 so that we first check local queue if tqa.tenantNodes == nil { tqa.tenantNodes = map[string][]*Node{} diff --git a/pkg/scheduler/queue/tree_queue_test.go b/pkg/scheduler/queue/tree_queue_test.go index cd59330259d..353524f0936 100644 --- a/pkg/scheduler/queue/tree_queue_test.go +++ b/pkg/scheduler/queue/tree_queue_test.go @@ -5,10 +5,9 @@ package queue import ( "fmt" "strings" + "testing" - //"fmt" "github.com/stretchr/testify/require" - "testing" ) func newTenantQuerierAssignments() *tenantQuerierAssignments { @@ -620,12 +619,22 @@ func Test_ChangeTenantQuerierAssignments(t *testing.T) { tree, err := NewTree(state, &roundRobinState{}, &roundRobinState{}) require.NoError(t, err) - err = tree.EnqueueBackByPath(QueuePath{"tenant-1", "query-component-1"}, "query-1") - err = tree.EnqueueBackByPath(QueuePath{"tenant-2", "query-component-1"}, "query-2") - err = tree.EnqueueBackByPath(QueuePath{"tenant-2", "query-component-1"}, "query-3") - err = tree.EnqueueBackByPath(QueuePath{"tenant-2", "query-component-1"}, "query-4") - err = tree.EnqueueBackByPath(QueuePath{"tenant-3", "query-component-1"}, "query-5") - require.NoError(t, err) + type enqueueObj struct { + obj any + path QueuePath + } + enqueueObjs := []enqueueObj{ + {"query-1", QueuePath{"tenant-1", "query-component-1"}}, + {"query-2", QueuePath{"tenant-2", "query-component-1"}}, + {"query-3", QueuePath{"tenant-2", "query-component-1"}}, + {"query-4", QueuePath{"tenant-2", "query-component-1"}}, + {"query-5", QueuePath{"tenant-3", "query-component-1"}}, + } + + for _, eo := range enqueueObjs { + err = tree.EnqueueBackByPath(eo.path, eo.obj) + require.NoError(t, err) + } querier1 := QuerierID("querier-1") querier2 := QuerierID("querier-2") diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index a5a5aaccc6f..9d007324e02 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -95,7 +95,7 @@ type connectedFrontend struct { type Config struct { MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant"` AdditionalQueryQueueDimensionsEnabled bool `yaml:"additional_query_queue_dimensions_enabled" category:"experimental"` - PrioritizeQueryComponentsEnabled bool `yaml:"prioritize_query_components_enabled" category:"experimental"` + PrioritizeQueryComponents bool `yaml:"prioritize_query_components" category:"experimental"` QuerierForgetDelay time.Duration `yaml:"querier_forget_delay" category:"experimental"` GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."` @@ -105,6 +105,7 @@ type Config struct { func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.IntVar(&cfg.MaxOutstandingPerTenant, "query-scheduler.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429.") f.BoolVar(&cfg.AdditionalQueryQueueDimensionsEnabled, "query-scheduler.additional-query-queue-dimensions-enabled", false, "Enqueue query requests with additional queue dimensions to split tenant request queues into subqueues. This enables separate requests to proceed from a tenant's subqueues even when other subqueues are blocked on slow query requests. Must be set on both query-frontend and scheduler to take effect. (default false)") + f.BoolVar(&cfg.PrioritizeQueryComponents, "query-scheduler.prioritize-query-components", false, "Prioritize rotation through query components over rotation through tenants during dequeueing.") f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.") cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f) @@ -157,7 +158,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe s.log, cfg.MaxOutstandingPerTenant, cfg.AdditionalQueryQueueDimensionsEnabled, - cfg.PrioritizeQueryComponentsEnabled, + cfg.PrioritizeQueryComponents, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests,