Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
chencs committed Jun 11, 2024
1 parent f5f461a commit 388fdb6
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 37 deletions.
24 changes: 12 additions & 12 deletions pkg/scheduler/queue/tenant_querier_assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,13 @@ func (tqa *tenantQuerierAssignments) shuffleTenantQueriers(tenantID TenantID, sc
//
// Note that because we use the shared tenantIDOrder and tenantOrderIndex to manage the queue, we functionally
// ignore each Node's individual queueOrder and queuePosition.
func (tqa *tenantQuerierAssignments) dequeueGetNode(n *Node) (*Node, bool) {
func (tqa *tenantQuerierAssignments) dequeueSelectNode(dequeueFrom *Node) (*Node, bool) {
// can't get a tenant if no querier set
if tqa.currentQuerier == nil {
return nil, true
}

checkedAllNodes := n.childrenChecked == len(n.queueMap)+1 // must check local queue as well
checkedAllNodes := dequeueFrom.childrenChecked == len(dequeueFrom.queueMap)+1 // must check local queue as well

// advance queue position for dequeue
tqa.tenantOrderIndex++
Expand All @@ -338,8 +338,8 @@ func (tqa *tenantQuerierAssignments) dequeueGetNode(n *Node) (*Node, bool) {
}

// no children
if len(n.queueMap) == 0 || tqa.tenantOrderIndex == localQueueIndex {
return n, checkedAllNodes
if len(dequeueFrom.queueMap) == 0 || tqa.tenantOrderIndex == localQueueIndex {
return dequeueFrom, checkedAllNodes
}

checkIndex := tqa.tenantOrderIndex
Expand All @@ -351,25 +351,25 @@ func (tqa *tenantQuerierAssignments) dequeueGetNode(n *Node) (*Node, bool) {
tenantID := tqa.tenantIDOrder[checkIndex]
tenantName := string(tenantID)

if _, ok := n.queueMap[tenantName]; !ok {
if _, ok := dequeueFrom.queueMap[tenantName]; !ok {
// tenant not in _this_ node's children, move on
checkIndex++
continue
}

// increment nodes checked even if not in tenant-querier map
n.childrenChecked++
checkedAllNodes = n.childrenChecked == len(n.queueMap)+1
dequeueFrom.childrenChecked++
checkedAllNodes = dequeueFrom.childrenChecked == len(dequeueFrom.queueMap)+1

// if the tenant-querier set is nil, any querier can serve this tenant
if tqa.tenantQuerierIDs[tenantID] == nil {
tqa.tenantOrderIndex = checkIndex
return n.queueMap[tenantName], checkedAllNodes
return dequeueFrom.queueMap[tenantName], checkedAllNodes
}
if tenantQuerierSet, ok := tqa.tenantQuerierIDs[tenantID]; ok {
if _, ok := tenantQuerierSet[*tqa.currentQuerier]; ok {
tqa.tenantOrderIndex = checkIndex
return n.queueMap[tenantName], checkedAllNodes
return dequeueFrom.queueMap[tenantName], checkedAllNodes
}
}
checkIndex++
Expand All @@ -379,14 +379,14 @@ func (tqa *tenantQuerierAssignments) dequeueGetNode(n *Node) (*Node, bool) {

// dequeueUpdateState updates the node state to reflect the number of children that have been checked
// for dequeueable elements, so that dequeueGetNode will stop checking if it has checked all possible nodes.
func (tqa *tenantQuerierAssignments) dequeueUpdateState(n *Node, v any, _ bool) {
func (tqa *tenantQuerierAssignments) dequeueUpdateState(dequeuedFrom *Node, v any, _ bool) {
// we need to reset our checked nodes if we found a value to dequeue
if v != nil {
n.childrenChecked = 0
dequeuedFrom.childrenChecked = 0
return
}

n.childrenChecked++
dequeuedFrom.childrenChecked++
}

// addChildNode adds a child to:
Expand Down
12 changes: 6 additions & 6 deletions pkg/scheduler/queue/tenant_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ func newQueueBroker(
// with query components at one level above tenants; if it is false,
// tenant nodes will each maintain their own query component subtree.
tree, err := NewTree(
tqas, // root
&roundRobinState{}, // tenant queues
&roundRobinState{}, // query components
tqas, // root; DequeueAlgorithm selects tenants
&roundRobinState{}, // tenant queues; DequeueAlgorithm selects query component
&roundRobinState{}, // query components; DequeueAlgorithm selects query from local queue
)
if prioritizeQueryComponents {
tree, err = NewTree(
&roundRobinState{}, // root
tqas, // query components
&roundRobinState{}, // tenant queues
&roundRobinState{}, // root; DequeueAlgorithm selects query component
tqas, // query components; DequeueAlgorithm selects tenant
&roundRobinState{}, // tenant queues; DequeueAlgorithm selects query from local queue
)
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/scheduler/queue/tree_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (n *Node) dequeue() (QueuePath, any) {
var dequeueNode *Node
// continue until we've found a value or checked all nodes that need checking
for v == nil && !checkedAllNodes {
dequeueNode, checkedAllNodes = n.dequeueAlgorithm.dequeueGetNode(n)
dequeueNode, checkedAllNodes = n.dequeueAlgorithm.dequeueSelectNode(n)
var deletedNode bool
// dequeuing from local queue
if dequeueNode == n {
Expand All @@ -218,8 +218,6 @@ func (n *Node) dequeue() (QueuePath, any) {
childPath, v = dequeueNode.dequeue()
// if the dequeue node is empty _after_ dequeuing, delete it from children
if dequeueNode.IsEmpty() {
// removing an element sets our position one step forward;
// tell state to reset it to original queuePosition
delete(n.queueMap, dequeueNode.Name())
deletedNode = n.dequeueAlgorithm.deleteChildNode(n, dequeueNode)
}
Expand Down
33 changes: 17 additions & 16 deletions pkg/scheduler/queue/tree_queueing_algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ package queue
// at the layer-level -- every Node at the same depth in a TreeQueue shares the same DequeueAlgorithm, including state
// that may be stored in a struct that implements DequeueAlgorithm.
type DequeueAlgorithm interface {
addChildNode(*Node, *Node)
deleteChildNode(*Node, *Node) bool
dequeueGetNode(*Node) (*Node, bool)
dequeueUpdateState(*Node, any, bool)
addChildNode(parent, child *Node)
deleteChildNode(parent, child *Node) bool
dequeueSelectNode(dequeueFrom *Node) (*Node, bool)
dequeueUpdateState(dequeuedFrom *Node, v any, nodeDeleted bool)
}

// roundRobinState is the simplest type of DequeueAlgorithm; nodes which use this DequeueAlgorithm and are at
Expand Down Expand Up @@ -54,14 +54,14 @@ func (rrs *roundRobinState) deleteChildNode(parent, child *Node) bool {

// dequeueGetNode returns the node at the node's queuePosition. queuePosition represents the position of
// the next node to dequeue from, and is incremented in dequeueUpdateState.
func (rrs *roundRobinState) dequeueGetNode(n *Node) (*Node, bool) {
checkedAllNodes := n.childrenChecked == len(n.queueOrder)+1
if n.queuePosition == localQueueIndex {
return n, checkedAllNodes
func (rrs *roundRobinState) dequeueSelectNode(dequeueFrom *Node) (*Node, bool) {
checkedAllNodes := dequeueFrom.childrenChecked == len(dequeueFrom.queueOrder)+1
if dequeueFrom.queuePosition == localQueueIndex {
return dequeueFrom, checkedAllNodes
}

currentNodeName := n.queueOrder[n.queuePosition]
if node, ok := n.queueMap[currentNodeName]; ok {
currentNodeName := dequeueFrom.queueOrder[dequeueFrom.queuePosition]
if node, ok := dequeueFrom.queueMap[currentNodeName]; ok {
return node, checkedAllNodes
}
return nil, checkedAllNodes
Expand All @@ -70,16 +70,17 @@ func (rrs *roundRobinState) dequeueGetNode(n *Node) (*Node, bool) {
// dequeueUpdateState increments queuePosition based on whether a child node was deleted during dequeue,
// and updates childrenChecked to reflect the number of children checked so that the dequeueGetNode operation
// will stop once it has checked all possible nodes.
func (rrs *roundRobinState) dequeueUpdateState(n *Node, v any, deletedNode bool) {
func (rrs *roundRobinState) dequeueUpdateState(dequeuedFrom *Node, v any, deletedNode bool) {
if v != nil {
n.childrenChecked = 0
dequeuedFrom.childrenChecked = 0
} else {
n.childrenChecked++
dequeuedFrom.childrenChecked++
}
// removing an element sets our position one step forward, functionally incrementing position already
if !deletedNode {
n.queuePosition++
dequeuedFrom.queuePosition++
}
if n.queuePosition >= len(n.queueOrder) {
n.queuePosition = localQueueIndex
if dequeuedFrom.queuePosition >= len(dequeuedFrom.queueOrder) {
dequeuedFrom.queuePosition = localQueueIndex
}
}

0 comments on commit 388fdb6

Please sign in to comment.