-
Notifications
You must be signed in to change notification settings - Fork 3.5k
/
Copy pathqueue.go
277 lines (227 loc) · 8.5 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
package queue
import (
"context"
"sync"
"time"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
)
const (
// How frequently to check for disconnected queriers that should be forgotten.
forgetCheckPeriod = 5 * time.Second
)
var (
ErrTooManyRequests = errors.New("too many outstanding requests")
ErrStopped = errors.New("queue is stopped")
)
// UserIndex is opaque type that allows to resume iteration over users between successive calls
// of RequestQueue.GetNextRequestForQuerier method.
type UserIndex struct {
last int
}
// Modify index to start iteration on the same user, for which last queue was returned.
func (ui UserIndex) ReuseLastUser() UserIndex {
if ui.last >= 0 {
return UserIndex{last: ui.last - 1}
}
return ui
}
// FirstUser returns UserIndex that starts iteration over user queues from the very first user.
func FirstUser() UserIndex {
return UserIndex{last: -1}
}
// Request stored into the queue.
type Request interface{}
// RequestQueue holds incoming requests in per-user queues. It also assigns each user specified number of queriers,
// and when querier asks for next request to handle (using GetNextRequestForQuerier), it returns requests
// in a fair fashion.
type RequestQueue struct {
services.Service
connectedQuerierWorkers *atomic.Int32
mtx sync.Mutex
cond contextCond // Notified when request is enqueued or dequeued, or querier is disconnected.
queues *queues
stopped bool
queueLength *prometheus.GaugeVec // Per user and reason.
discardedRequests *prometheus.CounterVec // Per user.
}
func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue {
q := &RequestQueue{
queues: newUserQueues(maxOutstandingPerTenant, forgetDelay),
connectedQuerierWorkers: atomic.NewInt32(0),
queueLength: queueLength,
discardedRequests: discardedRequests,
}
q.cond = contextCond{Cond: sync.NewCond(&q.mtx)}
q.Service = services.NewTimerService(forgetCheckPeriod, nil, q.forgetDisconnectedQueriers, q.stopping).WithName("request queue")
return q
}
// EnqueueRequest puts the request into the queue. MaxQueries is user-specific value that specifies how many queriers can
// this user use (zero or negative = all queriers). It is passed to each EnqueueRequest, because it can change
// between calls.
//
// If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.
func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers int, successFn func()) error {
q.mtx.Lock()
defer q.mtx.Unlock()
if q.stopped {
return ErrStopped
}
queue := q.queues.getOrAddQueue(userID, maxQueriers)
if queue == nil {
// This can only happen if userID is "".
return errors.New("no queue found")
}
select {
case queue <- req:
q.queueLength.WithLabelValues(userID).Inc()
q.cond.Broadcast()
// Call this function while holding a lock. This guarantees that no querier can fetch the request before function returns.
if successFn != nil {
successFn()
}
return nil
default:
q.discardedRequests.WithLabelValues(userID).Inc()
return ErrTooManyRequests
}
}
// GetNextRequestForQuerier find next user queue and takes the next request off of it. Will block if there are no requests.
// By passing user index from previous call of this method, querier guarantees that it iterates over all users fairly.
// If querier finds that request from the user is already expired, it can get a request for the same user by using UserIndex.ReuseLastUser.
func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, querierID string) (Request, UserIndex, error) {
q.mtx.Lock()
defer q.mtx.Unlock()
querierWait := false
FindQueue:
// We need to wait if there are no users, or no pending requests for given querier.
for (q.queues.len() == 0 || querierWait) && ctx.Err() == nil && !q.stopped {
querierWait = false
q.cond.Wait(ctx)
}
if q.stopped {
return nil, last, ErrStopped
}
if err := ctx.Err(); err != nil {
return nil, last, err
}
for {
queue, userID, idx := q.queues.getNextQueueForQuerier(last.last, querierID)
last.last = idx
if queue == nil {
break
}
// Pick next request from the queue.
for {
request := <-queue
if len(queue) == 0 {
q.queues.deleteQueue(userID)
}
q.queueLength.WithLabelValues(userID).Dec()
// Tell close() we've processed a request.
q.cond.Broadcast()
return request, last, nil
}
}
// There are no unexpired requests, so we can get back
// and wait for more requests.
querierWait = true
goto FindQueue
}
func (q *RequestQueue) forgetDisconnectedQueriers(_ context.Context) error {
q.mtx.Lock()
defer q.mtx.Unlock()
if q.queues.forgetDisconnectedQueriers(time.Now()) > 0 {
// We need to notify goroutines cause having removed some queriers
// may have caused a resharding.
q.cond.Broadcast()
}
return nil
}
func (q *RequestQueue) stopping(_ error) error {
q.mtx.Lock()
defer q.mtx.Unlock()
for q.queues.len() > 0 && q.connectedQuerierWorkers.Load() > 0 {
q.cond.Wait(context.Background())
}
// Only stop after dispatching enqueued requests.
q.stopped = true
// If there are still goroutines in GetNextRequestForQuerier method, they get notified.
q.cond.Broadcast()
return nil
}
func (q *RequestQueue) RegisterQuerierConnection(querier string) {
q.connectedQuerierWorkers.Inc()
q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.addQuerierConnection(querier)
}
func (q *RequestQueue) UnregisterQuerierConnection(querier string) {
q.connectedQuerierWorkers.Dec()
q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.removeQuerierConnection(querier, time.Now())
}
func (q *RequestQueue) NotifyQuerierShutdown(querierID string) {
q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.notifyQuerierShutdown(querierID)
}
func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64 {
return float64(q.connectedQuerierWorkers.Load())
}
// contextCond is a *sync.Cond with Wait() method overridden to support context-based waiting.
type contextCond struct {
*sync.Cond
// testHookBeforeWaiting is called before calling Cond.Wait() if it's not nil.
// Yes, it's ugly, but the http package settled jurisprudence:
// https://github.com/golang/go/blob/6178d25fc0b28724b1b5aec2b1b74fc06d9294c7/src/net/http/client.go#L596-L601
testHookBeforeWaiting func()
}
// Wait does c.cond.Wait() but will also return if the context provided is done.
// All the documentation of sync.Cond.Wait() applies, but it's especially important to remember that the mutex of
// the cond should be held while Wait() is called (and mutex will be held once it returns)
func (c contextCond) Wait(ctx context.Context) {
// "condWait" goroutine does q.cond.Wait() and signals through condWait channel.
condWait := make(chan struct{})
go func() {
if c.testHookBeforeWaiting != nil {
c.testHookBeforeWaiting()
}
c.Cond.Wait()
close(condWait)
}()
// "waiting" goroutine: signals that the condWait goroutine has started waiting.
// Notice that a closed waiting channel implies that the goroutine above has started waiting
// (because it has unlocked the mutex), but the other way is not true:
// - condWait it may have unlocked and is waiting, but someone else locked the mutex faster than us:
// in this case that caller will eventually unlock, and we'll be able to enter here.
// - condWait called Wait(), unlocked, received a broadcast and locked again faster than we were able to lock here:
// in this case condWait channel will be closed, and this goroutine will be waiting until we unlock.
waiting := make(chan struct{})
go func() {
c.L.Lock()
close(waiting)
c.L.Unlock()
}()
select {
case <-condWait:
// We don't know whether the waiting goroutine is done or not, but we don't care:
// it will be done once nobody is fighting for the mutex anymore.
case <-ctx.Done():
// In order to avoid leaking the condWait goroutine, we can send a broadcast.
// Before sending the broadcast we need to make sure that condWait goroutine is already waiting (or has already waited).
select {
case <-condWait:
// No need to broadcast as q.cond.Wait() has returned already.
return
case <-waiting:
// q.cond.Wait() might be still waiting (or maybe not!), so we'll poke it just in case.
c.Broadcast()
}
// Make sure we are not waiting anymore, we need to do that before returning as the caller will need to unlock the mutex.
<-condWait
}
}