Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing generalized tree queue with node state implemented by tenant-querier assignments #7873

Merged
merged 30 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8c0f045
Sketching out new tree queue with shuffle shard nodes
chencs Apr 10, 2024
ee2a804
Add test to illustrate updating tenantQuerierAssignments outside of tree
chencs Apr 10, 2024
fadc1be
pull out dequeue ops
chencs Apr 26, 2024
3cd2481
Deduplicate node logic and extract node state types
chencs May 1, 2024
d2f00a0
Add a test for reassigning tenantQuerierID map value
chencs May 1, 2024
4b155bf
wip
chencs May 28, 2024
26497e9
added back lastTenantIndex concept, and empty tenant placeholders
chencs Jun 3, 2024
10fcce3
actually add queuing algorithms
chencs Jun 3, 2024
92ef094
better way to pull out tenantID
chencs Jun 3, 2024
295b9ae
fixing tests
chencs Jun 4, 2024
461b92d
Port extant, relevant tree queue tests to new Tree, and finish rippin…
chencs Jun 6, 2024
d8a1330
Add config to flip tree to query component -> tenant
chencs Jun 7, 2024
abc4ab0
Add EnqueueFrontByPath tests, makeQueuePath based on prioritizeQueryC…
chencs Jun 10, 2024
d539206
Rename to TreeQueue
chencs Jun 10, 2024
3bae2d6
lint
chencs Jun 10, 2024
ce9491f
PR feedback
chencs Jun 11, 2024
57fadd6
renaming args and removing config flag re: PR feedback
chencs Jun 12, 2024
9be157a
some test cleanup
chencs Jun 12, 2024
2b45713
PR feedback on tests, mostly
chencs Jun 17, 2024
872a80d
Patrick PR feedback: Comment and naming updates
chencs Jun 18, 2024
6d7b9a6
Re-include original tree queue with a config switch
chencs Jun 24, 2024
44a393d
Implement state update fn and bring back TreeQueue-specific tests
chencs Jun 25, 2024
ecc02ea
Update CHANGELOG and flag name
chencs Jun 25, 2024
cd21632
update docs
chencs Jun 25, 2024
c56cdfc
Update docs
chencs Jun 26, 2024
bae99c7
Fix tenant removal on dequeue for legacy TreeQueue
chencs Jul 1, 2024
794c3da
remove unnecessary err check
chencs Jul 1, 2024
a12ded4
Fix CHANGELOG, add clarity to comments, and add nil check for queueEl…
chencs Jul 2, 2024
b0c5dd4
Update docstrings and tests per PR feedback
chencs Jul 2, 2024
da42243
Check tree item count after enqueues in test
chencs Jul 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## main / unreleased
* [ENHANCEMENT] Query-scheduler: Introduce `query-scheduler.use-multi-algorithm-query-queue`, which allows use of an experimental queue structure, with no change in external queue behavior. #7873

### Grafana Mimir

Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -15909,6 +15909,17 @@
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "use_multi_algorithm_query_queue",
"required": false,
"desc": "Use an experimental version of the query queue which has the same behavior as the existing queue, but integrates tenant selection into the tree model.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "query-scheduler.use-multi-algorithm-query-queue",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "querier_forget_delay",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2317,6 +2317,8 @@ Usage of ./cmd/mimir/mimir:
Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi. (default "memberlist")
-query-scheduler.service-discovery-mode string
[experimental] Service discovery mode that query-frontends and queriers use to find query-scheduler instances. When query-scheduler ring-based service discovery is enabled, this option needs be set on query-schedulers, query-frontends and queriers. Supported values are: dns, ring. (default "dns")
-query-scheduler.use-multi-algorithm-query-queue
[experimental] Use an experimental version of the query queue which has the same behavior as the existing queue, but integrates tenant selection into the tree model.
-ruler-storage.azure.account-key string
Azure storage account key. If unset, Azure managed identities will be used for authentication instead.
-ruler-storage.azure.account-name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1708,6 +1708,12 @@ The `query_scheduler` block configures the query-scheduler.
# CLI flag: -query-scheduler.additional-query-queue-dimensions-enabled
[additional_query_queue_dimensions_enabled: <boolean> | default = false]

# (experimental) Use an experimental version of the query queue which has the
# same behavior as the existing queue, but integrates tenant selection into the
# tree model.
# CLI flag: -query-scheduler.use-multi-algorithm-query-queue
[use_multi_algorithm_query_queue: <boolean> | default = false]

# (experimental) If a querier disconnects without sending notification about
# graceful shutdown, the query-scheduler will keep the querier in the tenant's
# shard until the forget delay has passed. This feature is useful to reduce the
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
log,
cfg.MaxOutstandingPerTenant,
false,
false,
cfg.QuerierForgetDelay,
f.queueLength,
f.discardedRequests,
Expand Down Expand Up @@ -251,7 +252,7 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {

/*
We want to dequeue the next unexpired request from the chosen tenant queue.
The chance of choosing a particular tenant for dequeueing is (1/active_tenants).
The chance of choosing a particular tenant for dequeuing is (1/active_tenants).
This is problematic under load, especially with other middleware enabled such as
querier.split-by-interval, where one request may fan out into many.
If expired requests aren't exhausted before checking another tenant, it would take
Expand Down
283 changes: 283 additions & 0 deletions pkg/scheduler/queue/multi_queuing_algorithm_tree_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
// SPDX-License-Identifier: AGPL-3.0-only

package queue

import (
"container/list"
"fmt"
)

type QueuePath []string //nolint:revive // disallows types beginning with package name
type QueueIndex int //nolint:revive // disallows types beginning with package name

const localQueueIndex = -1

type Tree interface {
Logiraptor marked this conversation as resolved.
Show resolved Hide resolved
EnqueueFrontByPath(QueuePath, any) error
EnqueueBackByPath(QueuePath, any) error
Dequeue() (QueuePath, any)
ItemCount() int
IsEmpty() bool
}

// MultiQueuingAlgorithmTreeQueue holds metadata and a pointer to the root node of a hierarchical queue implementation.
// The root Node maintains a localQueue and an arbitrary number of child nodes (which themselves
// may have local queues and children). Each Node in MultiQueuingAlgorithmTreeQueue uses a QueuingAlgorithm (determined by
// node depth) to determine dequeue order of that Node's subtree.
//
// Each queuing dimension is modeled as a node in the tree, internally reachable through a QueuePath.
//
// The QueuePath is an ordered array of strings describing the path from the tree root to a Node.
// In addition to child Nodes, each Node contains a local queue (FIFO) of items.
//
// When dequeuing from a given node, a Node will use its QueuingAlgorithm to choose either itself
// or a child node to dequeue from recursively (i.e., a child Node will use its own QueuingAlgorithm
// to determine how to proceed). MultiQueuingAlgorithmTreeQueue will not dequeue from two different Nodes at the same depth
// consecutively, unless the previously-checked Node was empty down to the leaf node.
type MultiQueuingAlgorithmTreeQueue struct {
rootNode *Node
algosByDepth []QueuingAlgorithm
}

func NewTree(queuingAlgorithms ...QueuingAlgorithm) (*MultiQueuingAlgorithmTreeQueue, error) {
if len(queuingAlgorithms) == 0 {
return nil, fmt.Errorf("cannot create a tree without defined QueuingAlgorithm")
}
root, err := newNode("root", 0, queuingAlgorithms[0])
if err != nil {
return nil, err
}
root.depth = 0
return &MultiQueuingAlgorithmTreeQueue{
rootNode: root,
algosByDepth: queuingAlgorithms,
}, nil
}

func (t *MultiQueuingAlgorithmTreeQueue) ItemCount() int {
return t.rootNode.ItemCount()
}

func (t *MultiQueuingAlgorithmTreeQueue) IsEmpty() bool {
return t.rootNode.IsEmpty()
}

// Dequeue removes and returns an item from the front of the next appropriate Node in the MultiQueuingAlgorithmTreeQueue, as
// well as the path to the Node which that item was dequeued from.
//
// Either the root/self node or a child node is chosen according to the Node's QueuingAlgorithm. If
// the root node is chosen, an item will be dequeued from the front of its localQueue. If a child
// node is chosen, it is recursively dequeued from until a node selects its localQueue.
//
// Nodes that empty down to the leaf after being dequeued from (or which are found to be empty leaf
// nodes during the dequeue operation) are deleted as the recursion returns up the stack. This
// maintains structural guarantees relied upon to make IsEmpty() non-recursive.
func (t *MultiQueuingAlgorithmTreeQueue) Dequeue() (QueuePath, any) {
path, v := t.rootNode.dequeue()
// The returned node dequeue path includes the root node; exclude
// this so that the return path can be used if needed to enqueue.
return path[1:], v
}

// EnqueueBackByPath enqueues an item in the back of the local queue of the node
// located at a given path through the tree; nodes for the path are created as needed.
//
// path is relative to the root node; providing a QueuePath beginning with "root"
// will create a child node of the root node which is also named "root."
func (t *MultiQueuingAlgorithmTreeQueue) EnqueueBackByPath(path QueuePath, v any) error {
return t.rootNode.enqueueBackByPath(t, path, v)
}

// EnqueueFrontByPath enqueues an item in the front of the local queue of the Node
// located at a given path through the MultiQueuingAlgorithmTreeQueue; nodes for the path are created as needed.
//
// Enqueueing to the front is intended only for items which were first enqueued to the back
// and then dequeued after reaching the front.
//
// Re-enqueueing to the front is only intended for use in cases where a queue consumer
// fails to complete operations on the dequeued item, but failure is not yet final, and the
// operations should be retried by a subsequent queue consumer. A concrete example is when
// a queue consumer fails or disconnects for unrelated reasons while we are in the process
// of dequeuing a request for it.
//
// path must be relative to the root node; providing a QueuePath beginning with "root"
// will create a child node of root which is also named "root."
func (t *MultiQueuingAlgorithmTreeQueue) EnqueueFrontByPath(path QueuePath, v any) error {
return t.rootNode.enqueueFrontByPath(t, path, v)
}

func (t *MultiQueuingAlgorithmTreeQueue) GetNode(path QueuePath) *Node {
return t.rootNode.getNode(path)
}

// Node maintains node-specific information used to enqueue and dequeue to itself, such as a local
// queue, node depth, references to its children, and position in queue.
// Note that the tenantQuerierAssignments QueuingAlgorithm largely disregards Node's queueOrder and
// queuePosition, managing analogous state instead, because shuffle-sharding + fairness requirements
// necessitate input from the querier.
type Node struct {
name string
localQueue *list.List
queuePosition int // next index in queueOrder to dequeue from
queueOrder []string // order for dequeuing from self/children
queueMap map[string]*Node
depth int
queuingAlgorithm QueuingAlgorithm
childrenChecked int
}

func newNode(name string, depth int, da QueuingAlgorithm) (*Node, error) {
if da == nil {
return nil, fmt.Errorf("cannot create a node without a defined QueuingAlgorithm")
}
return &Node{
name: name,
localQueue: list.New(),
queuePosition: localQueueIndex,
queueOrder: make([]string, 0),
queueMap: make(map[string]*Node, 1),
depth: depth,
queuingAlgorithm: da,
}, nil
}

func (n *Node) IsEmpty() bool {
// avoid recursion to make this a cheap operation
//
// Because we dereference empty child nodes during dequeuing,
// we assume that emptiness means there are no child nodes
// and nothing in this tree node's local queue.
//
// In reality a package member could attach empty child queues with getOrAddNode
// in order to get a functionally-empty tree that would report false for IsEmpty.
// We assume this does not occur or is not relevant during normal operation.
return n.localQueue.Len() == 0 && len(n.queueMap) == 0
}

// ItemCount counts the queue items in the Node and in all its children, recursively.
func (n *Node) ItemCount() int {
items := n.localQueue.Len()
for _, child := range n.queueMap {
items += child.ItemCount()
}
return items
}

func (n *Node) Name() string {
return n.name
}

func (n *Node) getLocalQueue() *list.List {
return n.localQueue
}

func (n *Node) enqueueFrontByPath(tree *MultiQueuingAlgorithmTreeQueue, pathFromNode QueuePath, v any) error {
childNode, err := n.getOrAddNode(pathFromNode, tree)
if err != nil {
return err
}
childNode.localQueue.PushFront(v)
return nil
}

func (n *Node) enqueueBackByPath(tree *MultiQueuingAlgorithmTreeQueue, pathFromNode QueuePath, v any) error {
childNode, err := n.getOrAddNode(pathFromNode, tree)
if err != nil {
return err
}
childNode.localQueue.PushBack(v)
return nil
}

func (n *Node) dequeue() (QueuePath, any) {
var v any
var childPath QueuePath

path := QueuePath{n.name}

if n.IsEmpty() {
return path, nil
}

var checkedAllNodes bool
var dequeueNode *Node
// continue until we've found a value or checked all nodes that need checking
for v == nil && !checkedAllNodes {
dequeueNode, checkedAllNodes = n.queuingAlgorithm.dequeueSelectNode(n)
switch dequeueNode {
// dequeuing from local queue
case n:
if n.localQueue.Len() > 0 {
// dequeueNode is self, local queue non-empty
if elt := n.localQueue.Front(); elt != nil {
n.localQueue.Remove(elt)
v = elt.Value
}
}
// no dequeue-able child found; break out of the loop,
// since we won't find anything to dequeue if we don't
// have a node to dequeue from now
case nil:
checkedAllNodes = true
// dequeue from a child
default:
childPath, v = dequeueNode.dequeue()
}

if v == nil {
n.childrenChecked++
}

n.queuingAlgorithm.dequeueUpdateState(n, dequeueNode)
}
// reset childrenChecked to 0 before completing this dequeue
n.childrenChecked = 0
return append(path, childPath...), v
}

func (n *Node) getNode(pathFromNode QueuePath) *Node {
if len(pathFromNode) == 0 {
return n
}

if n.queueMap == nil {
return nil
}

if childQueue, ok := n.queueMap[pathFromNode[0]]; ok {
return childQueue.getNode(pathFromNode[1:])
}

// no child node matches next path segment
return nil
}

// getOrAddNode recursively gets or adds tree queue nodes based on given relative child path. It
// checks whether the first node in pathFromNode exists in the Node's children; if no node exists,
// one is created and added to the Node's queueOrder, according to the Node's QueuingAlgorithm.
//
// pathFromNode must be relative to the receiver node; providing a QueuePath beginning with
// the receiver/parent node name will create a child node of the same name as the parent.
func (n *Node) getOrAddNode(pathFromNode QueuePath, tree *MultiQueuingAlgorithmTreeQueue) (*Node, error) {
if len(pathFromNode) == 0 {
return n, nil
}

var childNode *Node
var ok bool
var err error
if childNode, ok = n.queueMap[pathFromNode[0]]; !ok {
// child does not exist, create it
if n.depth+1 >= len(tree.algosByDepth) {
return nil, fmt.Errorf("cannot add a node beyond max tree depth: %v", len(tree.algosByDepth))
}
childNode, err = newNode(pathFromNode[0], n.depth+1, tree.algosByDepth[n.depth+1])
if err != nil {
return nil, err
}
// add the newly created child to the node
n.queuingAlgorithm.addChildNode(n, childNode)

}
return childNode.getOrAddNode(pathFromNode[1:], tree)
}
Loading
Loading