Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
chencs committed Jun 10, 2024
1 parent 2efe392 commit 6294be7
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 28 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
* [EHNAHCEMENT] Distributor: add `insight=true` to remote-write and OTLP write handlers when the HTTP response status code is 4xx. #8294
* [ENHANCEMENT] Ingester: reduce locked time while matching postings for a label, improving the write latency and compaction speed. #8327
* [ENHANCEMENT] Ingester: reduce the amount of locks taken during the Head compaction's garbage-collection process, improving the write latency and compaction speed. #8327
* [ENHANCEMENT] Query-scheduler: Introduce `prioritizeQueryComponents`, which allows configuration of the query-scheduler tree to prioritize dequeuing from a specific query component more highly than dequeueing from a specific tenant. #7873
* [ENHANCEMENT] Query-scheduler: Introduce `query-frontend.prioritize-query-components` and `query-scheduler.prioritize-query-components`, which allow configuration of the request queue tree to prioritize dequeuing from a specific query component more highly than dequeueing from a specific tenant. #7873
* [BUGFIX] Distributor: prometheus retry on 5xx and 429 errors, while otlp collector only retry on 429, 502, 503 and 504, mapping other 5xx errors to the retryable ones in otlp endpoint. #8324
* [BUGFIX] Distributor: make OTLP endpoint return marshalled proto bytes as response body for 4xx/5xx errors. #8227
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"errors"
"fmt"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"math/rand"
"strconv"
"strings"
Expand All @@ -22,6 +21,7 @@ import (
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
Expand Down
8 changes: 0 additions & 8 deletions pkg/scheduler/queue/tenant_querier_assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,6 @@ type tenantQuerierAssignments struct {
currentQuerier *QuerierID
}

func (tqa *tenantQuerierAssignments) getTenant(tenantID TenantID) (*queueTenant, error) {
if tenantID == emptyTenantID {
return nil, ErrInvalidTenantID
}
tenant := tqa.tenantsByID[tenantID]
return tenant, nil
}

// createOrUpdateTenant creates or updates a tenant into the tenant-querier assignment state.
//
// New tenants are added to the tenant order list and tenant-querier shards are shuffled if needed.
Expand Down
11 changes: 6 additions & 5 deletions pkg/scheduler/queue/tenant_queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ package queue
import (
"context"
"fmt"
"github.com/grafana/dskit/httpgrpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"math"
"math/rand"
"testing"
"time"

"github.com/grafana/dskit/httpgrpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func (qb *queueBroker) enqueueObjectsForTests(tenantID TenantID, numObjects int) error {
Expand Down Expand Up @@ -111,7 +112,7 @@ func TestQueues(t *testing.T) {
{buildExpectedObject(tenantTwo.tenantID, 2), tenantTwo},
{buildExpectedObject(tenantOne.tenantID, 5), tenantOne},
}
lastTenantIndex = assertExpectedValuesOnDequeue(t, qb, -1, "querier-2", expectedDequeueVals)
_ = assertExpectedValuesOnDequeue(t, qb, -1, "querier-2", expectedDequeueVals)

//[one two three]
// confirm fifo by adding a third tenant queue and iterating to it
Expand Down Expand Up @@ -310,7 +311,7 @@ func TestQueuesOnTerminatingQuerier(t *testing.T) {

// After disconnecting querier-2, it's expected to own no queue.
qb.tenantQuerierAssignments.removeQuerier("querier-2")
req, tenant, qTwolastTenantIndex, err = qb.dequeueRequestForQuerier(qTwolastTenantIndex, "querier-2")
req, tenant, _, _ = qb.dequeueRequestForQuerier(qTwolastTenantIndex, "querier-2")
assert.Nil(t, req)
assert.Nil(t, tenant)
assert.Equal(t, ErrQuerierShuttingDown, err)
Expand Down
4 changes: 1 addition & 3 deletions pkg/scheduler/queue/tree_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ func newNode(name string, depth int, da DequeueAlgorithm) (*Node, error) {
if da == nil {
return nil, fmt.Errorf("cannot create a node without a defined DequeueAlgorithm")
}
switch da.(type) {
case *tenantQuerierAssignments:
tqa := da.(*tenantQuerierAssignments)
if tqa, ok := da.(*tenantQuerierAssignments); ok {
tqa.tenantOrderIndex = localQueueIndex - 1 // start from -2 so that we first check local queue
if tqa.tenantNodes == nil {
tqa.tenantNodes = map[string][]*Node{}
Expand Down
25 changes: 17 additions & 8 deletions pkg/scheduler/queue/tree_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ package queue
import (
"fmt"
"strings"
"testing"

//"fmt"
"github.com/stretchr/testify/require"
"testing"
)

func newTenantQuerierAssignments() *tenantQuerierAssignments {
Expand Down Expand Up @@ -620,12 +619,22 @@ func Test_ChangeTenantQuerierAssignments(t *testing.T) {
tree, err := NewTree(state, &roundRobinState{}, &roundRobinState{})
require.NoError(t, err)

err = tree.EnqueueBackByPath(QueuePath{"tenant-1", "query-component-1"}, "query-1")
err = tree.EnqueueBackByPath(QueuePath{"tenant-2", "query-component-1"}, "query-2")
err = tree.EnqueueBackByPath(QueuePath{"tenant-2", "query-component-1"}, "query-3")
err = tree.EnqueueBackByPath(QueuePath{"tenant-2", "query-component-1"}, "query-4")
err = tree.EnqueueBackByPath(QueuePath{"tenant-3", "query-component-1"}, "query-5")
require.NoError(t, err)
type enqueueObj struct {
obj any
path QueuePath
}
enqueueObjs := []enqueueObj{
{"query-1", QueuePath{"tenant-1", "query-component-1"}},
{"query-2", QueuePath{"tenant-2", "query-component-1"}},
{"query-3", QueuePath{"tenant-2", "query-component-1"}},
{"query-4", QueuePath{"tenant-2", "query-component-1"}},
{"query-5", QueuePath{"tenant-3", "query-component-1"}},
}

for _, eo := range enqueueObjs {
err = tree.EnqueueBackByPath(eo.path, eo.obj)
require.NoError(t, err)
}

querier1 := QuerierID("querier-1")
querier2 := QuerierID("querier-2")
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type connectedFrontend struct {
type Config struct {
MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant"`
AdditionalQueryQueueDimensionsEnabled bool `yaml:"additional_query_queue_dimensions_enabled" category:"experimental"`
PrioritizeQueryComponentsEnabled bool `yaml:"prioritize_query_components_enabled" category:"experimental"`
PrioritizeQueryComponents bool `yaml:"prioritize_query_components" category:"experimental"`
QuerierForgetDelay time.Duration `yaml:"querier_forget_delay" category:"experimental"`

GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."`
Expand All @@ -105,6 +105,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.IntVar(&cfg.MaxOutstandingPerTenant, "query-scheduler.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429.")
f.BoolVar(&cfg.AdditionalQueryQueueDimensionsEnabled, "query-scheduler.additional-query-queue-dimensions-enabled", false, "Enqueue query requests with additional queue dimensions to split tenant request queues into subqueues. This enables separate requests to proceed from a tenant's subqueues even when other subqueues are blocked on slow query requests. Must be set on both query-frontend and scheduler to take effect. (default false)")
f.BoolVar(&cfg.PrioritizeQueryComponents, "query-scheduler.prioritize-query-components", false, "Prioritize rotation through query components over rotation through tenants during dequeueing.")
f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.")

cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f)
Expand Down Expand Up @@ -157,7 +158,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
s.log,
cfg.MaxOutstandingPerTenant,
cfg.AdditionalQueryQueueDimensionsEnabled,
cfg.PrioritizeQueryComponentsEnabled,
cfg.PrioritizeQueryComponents,
cfg.QuerierForgetDelay,
s.queueLength,
s.discardedRequests,
Expand Down

0 comments on commit 6294be7

Please sign in to comment.