Skip to content

Commit

Permalink
renaming args re: PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
chencs committed Jun 12, 2024
1 parent 6cba727 commit c46f9ff
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 44 deletions.
1 change: 0 additions & 1 deletion pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
log,
cfg.MaxOutstandingPerTenant,
false,
false,
cfg.QuerierForgetDelay,
f.queueLength,
f.discardedRequests,
Expand Down
5 changes: 1 addition & 4 deletions pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ type RequestQueue struct {
// settings
maxOutstandingPerTenant int
additionalQueueDimensionsEnabled bool
prioritizeQueryComponents bool
forgetDelay time.Duration

// metrics for reporting
Expand Down Expand Up @@ -178,7 +177,6 @@ func NewRequestQueue(
log log.Logger,
maxOutstandingPerTenant int,
additionalQueueDimensionsEnabled bool,
prioritizeQueryComponents bool,
forgetDelay time.Duration,
queueLength *prometheus.GaugeVec,
discardedRequests *prometheus.CounterVec,
Expand All @@ -195,7 +193,6 @@ func NewRequestQueue(
log: log,
maxOutstandingPerTenant: maxOutstandingPerTenant,
additionalQueueDimensionsEnabled: additionalQueueDimensionsEnabled,
prioritizeQueryComponents: prioritizeQueryComponents,
forgetDelay: forgetDelay,

// metrics for reporting
Expand All @@ -217,7 +214,7 @@ func NewRequestQueue(
waitingQuerierConnsToDispatch: list.New(),

QueryComponentUtilization: queryComponentCapacity,
queueBroker: newQueueBroker(maxOutstandingPerTenant, additionalQueueDimensionsEnabled, prioritizeQueryComponents, forgetDelay),
queueBroker: newQueueBroker(maxOutstandingPerTenant, additionalQueueDimensionsEnabled, false, forgetDelay),
}

q.Service = services.NewTimerService(forgetCheckPeriod, q.starting, q.forgetDisconnectedQueriers, q.stop).WithName("request queue")
Expand Down
11 changes: 3 additions & 8 deletions pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ func TestMultiDimensionalQueueFairnessSlowConsumerEffects(t *testing.T) {
log.NewNopLogger(),
maxOutstandingRequestsPerTenant,
additionalQueueDimensionsEnabled,
false,
forgetQuerierDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -238,7 +237,6 @@ func BenchmarkConcurrentQueueOperations(b *testing.B) {
log.NewNopLogger(),
maxOutstandingRequestsPerTenant,
true,
false,
forgetQuerierDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -410,7 +408,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe

queue, err := NewRequestQueue(
log.NewNopLogger(),
1, true, false,
1, true,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -479,7 +477,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ReshardNotifiedCorrectlyForMultip

queue, err := NewRequestQueue(
log.NewNopLogger(),
1, true, false,
1, true,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -564,7 +562,6 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnAfterContextCancelled
log.NewNopLogger(),
1,
true,
false,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -606,7 +603,6 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnImmediatelyIfQuerierI
log.NewNopLogger(),
1,
true,
false,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -636,7 +632,6 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend
log.NewNopLogger(),
1,
true,
false,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand All @@ -647,7 +642,7 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend

// bypassing queue dispatcher loop for direct usage of the queueBroker and
// passing a waitingQuerierConn for a canceled querier connection
queueBroker := newQueueBroker(queue.maxOutstandingPerTenant, queue.additionalQueueDimensionsEnabled, queue.prioritizeQueryComponents, queue.forgetDelay)
queueBroker := newQueueBroker(queue.maxOutstandingPerTenant, queue.additionalQueueDimensionsEnabled, false, queue.forgetDelay)
queueBroker.addQuerierConnection(querierID)

tenantMaxQueriers := 0 // no sharding
Expand Down
22 changes: 11 additions & 11 deletions pkg/scheduler/queue/tenant_querier_assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,13 @@ func (tqa *tenantQuerierAssignments) shuffleTenantQueriers(tenantID TenantID, sc
//
// Note that because we use the shared tenantIDOrder and tenantOrderIndex to manage the queue, we functionally
// ignore each Node's individual queueOrder and queuePosition.
func (tqa *tenantQuerierAssignments) dequeueSelectNode(dequeueFrom *Node) (*Node, bool) {
func (tqa *tenantQuerierAssignments) dequeueSelectNode(node *Node) (*Node, bool) {
// can't get a tenant if no querier set
if tqa.currentQuerier == nil {
return nil, true
}

checkedAllNodes := dequeueFrom.childrenChecked == len(dequeueFrom.queueMap)+1 // must check local queue as well
checkedAllNodes := node.childrenChecked == len(node.queueMap)+1 // must check local queue as well

// advance queue position for dequeue
tqa.tenantOrderIndex++
Expand All @@ -338,8 +338,8 @@ func (tqa *tenantQuerierAssignments) dequeueSelectNode(dequeueFrom *Node) (*Node
}

// no children
if len(dequeueFrom.queueMap) == 0 || tqa.tenantOrderIndex == localQueueIndex {
return dequeueFrom, checkedAllNodes
if len(node.queueMap) == 0 || tqa.tenantOrderIndex == localQueueIndex {
return node, checkedAllNodes
}

checkIndex := tqa.tenantOrderIndex
Expand All @@ -351,25 +351,25 @@ func (tqa *tenantQuerierAssignments) dequeueSelectNode(dequeueFrom *Node) (*Node
tenantID := tqa.tenantIDOrder[checkIndex]
tenantName := string(tenantID)

if _, ok := dequeueFrom.queueMap[tenantName]; !ok {
if _, ok := node.queueMap[tenantName]; !ok {
// tenant not in _this_ node's children, move on
checkIndex++
continue
}

// increment nodes checked even if not in tenant-querier map
dequeueFrom.childrenChecked++
checkedAllNodes = dequeueFrom.childrenChecked == len(dequeueFrom.queueMap)+1
node.childrenChecked++
checkedAllNodes = node.childrenChecked == len(node.queueMap)+1

// if the tenant-querier set is nil, any querier can serve this tenant
if tqa.tenantQuerierIDs[tenantID] == nil {
tqa.tenantOrderIndex = checkIndex
return dequeueFrom.queueMap[tenantName], checkedAllNodes
return node.queueMap[tenantName], checkedAllNodes
}
if tenantQuerierSet, ok := tqa.tenantQuerierIDs[tenantID]; ok {
if _, ok := tenantQuerierSet[*tqa.currentQuerier]; ok {
tqa.tenantOrderIndex = checkIndex
return dequeueFrom.queueMap[tenantName], checkedAllNodes
return node.queueMap[tenantName], checkedAllNodes
}
}
checkIndex++
Expand All @@ -385,7 +385,7 @@ func (tqa *tenantQuerierAssignments) dequeueSelectNode(dequeueFrom *Node) (*Node
//
// dequeueUpdateState would normally also handle incrementing the queue position after performing a dequeue, but
// tenantQuerierAssignments currently expects the caller to handle this by having the querier set tenantOrderIndex.
func (tqa *tenantQuerierAssignments) dequeueUpdateState(parent *Node, dequeuedFrom *Node) {
func (tqa *tenantQuerierAssignments) dequeueUpdateState(node *Node, dequeuedFrom *Node) {
// if dequeuedFrom is nil or is not empty, we don't need to do anything;
// position updates will be handled by the caller, and we don't need to remove any nodes.
if dequeuedFrom == nil || !dequeuedFrom.IsEmpty() {
Expand All @@ -394,7 +394,7 @@ func (tqa *tenantQuerierAssignments) dequeueUpdateState(parent *Node, dequeuedFr

// delete from the node's children
childName := dequeuedFrom.Name()
delete(parent.queueMap, childName)
delete(node.queueMap, childName)

// delete from shared tenantNodes
for i, tenantNode := range tqa.tenantNodes[childName] {
Expand Down
34 changes: 17 additions & 17 deletions pkg/scheduler/queue/tree_queueing_algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ package queue
// that may be stored in a struct that implements DequeueAlgorithm.
type DequeueAlgorithm interface {
addChildNode(parent, child *Node)
dequeueSelectNode(dequeueFrom *Node) (*Node, bool)
dequeueUpdateState(dequeueParent *Node, dequeuedFrom *Node)
dequeueSelectNode(node *Node) (*Node, bool)
dequeueUpdateState(node *Node, dequeuedFrom *Node)
}

// roundRobinState is the simplest type of DequeueAlgorithm; nodes which use this DequeueAlgorithm and are at
Expand Down Expand Up @@ -38,14 +38,14 @@ func (rrs *roundRobinState) addChildNode(parent, child *Node) {

// dequeueGetNode returns the node at the node's queuePosition. queuePosition represents the position of
// the next node to dequeue from, and is incremented in dequeueUpdateState.
func (rrs *roundRobinState) dequeueSelectNode(dequeueFrom *Node) (*Node, bool) {
checkedAllNodes := dequeueFrom.childrenChecked == len(dequeueFrom.queueOrder)+1
if dequeueFrom.queuePosition == localQueueIndex {
return dequeueFrom, checkedAllNodes
func (rrs *roundRobinState) dequeueSelectNode(node *Node) (*Node, bool) {
checkedAllNodes := node.childrenChecked == len(node.queueOrder)+1
if node.queuePosition == localQueueIndex {
return node, checkedAllNodes
}

currentNodeName := dequeueFrom.queueOrder[dequeueFrom.queuePosition]
if node, ok := dequeueFrom.queueMap[currentNodeName]; ok {
currentNodeName := node.queueOrder[node.queuePosition]
if node, ok := node.queueMap[currentNodeName]; ok {
return node, checkedAllNodes
}
return nil, checkedAllNodes
Expand All @@ -56,32 +56,32 @@ func (rrs *roundRobinState) dequeueSelectNode(dequeueFrom *Node) (*Node, bool) {
// - increments queuePosition if no child was deleted
// - updates childrenChecked to reflect the number of children checked so that the dequeueGetNode operation
// will stop once it has checked all possible nodes.
func (rrs *roundRobinState) dequeueUpdateState(parent *Node, dequeuedFrom *Node) {
func (rrs *roundRobinState) dequeueUpdateState(node *Node, dequeuedFrom *Node) {
// if the child node is nil, we haven't done anything to the tree; return early
if dequeuedFrom == nil {
return
}

// corner case: if parent == dequeuedFrom and is empty, we will try to delete a "child" (no-op),
// and won't increment position. This is fine, because if the parent is empty, there's nothing to
// corner case: if node == dequeuedFrom and is empty, we will try to delete a "child" (no-op),
// and won't increment position. This is fine, because if the node is empty, there's nothing to
// increment to.
childIsEmpty := dequeuedFrom.IsEmpty()

// if the child is empty, we should delete it, but not increment queue position, since removing an element
// from queueOrder sets our position to the next element already.
if childIsEmpty {
childName := dequeuedFrom.Name()
delete(parent.queueMap, childName)
for idx, name := range parent.queueOrder {
delete(node.queueMap, childName)
for idx, name := range node.queueOrder {
if name == childName {
parent.queueOrder = append(parent.queueOrder[:idx], parent.queueOrder[idx+1:]...)
node.queueOrder = append(node.queueOrder[:idx], node.queueOrder[idx+1:]...)
}
}

} else {
parent.queuePosition++
node.queuePosition++
}
if parent.queuePosition >= len(parent.queueOrder) {
parent.queuePosition = localQueueIndex
if node.queuePosition >= len(node.queueOrder) {
node.queuePosition = localQueueIndex
}
}
3 changes: 0 additions & 3 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ type connectedFrontend struct {
type Config struct {
MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant"`
AdditionalQueryQueueDimensionsEnabled bool `yaml:"additional_query_queue_dimensions_enabled" category:"experimental"`
PrioritizeQueryComponents bool `yaml:"prioritize_query_components" category:"experimental"`
QuerierForgetDelay time.Duration `yaml:"querier_forget_delay" category:"experimental"`

GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."`
Expand All @@ -105,7 +104,6 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.IntVar(&cfg.MaxOutstandingPerTenant, "query-scheduler.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429.")
f.BoolVar(&cfg.AdditionalQueryQueueDimensionsEnabled, "query-scheduler.additional-query-queue-dimensions-enabled", false, "Enqueue query requests with additional queue dimensions to split tenant request queues into subqueues. This enables separate requests to proceed from a tenant's subqueues even when other subqueues are blocked on slow query requests. Must be set on both query-frontend and scheduler to take effect. (default false)")
f.BoolVar(&cfg.PrioritizeQueryComponents, "query-scheduler.prioritize-query-components", false, "Prioritize rotation through query components over rotation through tenants during dequeueing.")
f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.")

cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f)
Expand Down Expand Up @@ -158,7 +156,6 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
s.log,
cfg.MaxOutstandingPerTenant,
cfg.AdditionalQueryQueueDimensionsEnabled,
cfg.PrioritizeQueryComponents,
cfg.QuerierForgetDelay,
s.queueLength,
s.discardedRequests,
Expand Down

0 comments on commit c46f9ff

Please sign in to comment.