-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathlog_send_retry_queue.go
75 lines (65 loc) · 1.92 KB
/
log_send_retry_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package tencentcloud_cls_sdk_go
import (
"container/heap"
"sync"
"time"
)
// LogSendRetryQueue RetryQueue cache ProducerBatch and retry latter
type LogSendRetryQueue struct {
batch []*ProducerBatch
mutex sync.Mutex
}
// NewRetryQueue ...
func NewRetryQueue() *LogSendRetryQueue {
retryQueue := LogSendRetryQueue{}
heap.Init(&retryQueue)
return &retryQueue
}
func (retryQueue *LogSendRetryQueue) sendToRetryQueue(producerBatch *ProducerBatch) {
retryQueue.mutex.Lock()
defer retryQueue.mutex.Unlock()
if producerBatch != nil {
heap.Push(retryQueue, producerBatch)
}
}
func (retryQueue *LogSendRetryQueue) getRetryBatch(moverShutDownFlag bool) (producerBatchList []*ProducerBatch) {
retryQueue.mutex.Lock()
defer retryQueue.mutex.Unlock()
if !moverShutDownFlag {
for retryQueue.Len() > 0 {
producerBatch := heap.Pop(retryQueue)
if producerBatch.(*ProducerBatch).nextRetryMs < GetTimeMs(time.Now().UnixNano()) {
producerBatchList = append(producerBatchList, producerBatch.(*ProducerBatch))
} else {
heap.Push(retryQueue, producerBatch.(*ProducerBatch))
break
}
}
} else {
for retryQueue.Len() > 0 {
producerBatch := heap.Pop(retryQueue)
producerBatchList = append(producerBatchList, producerBatch.(*ProducerBatch))
}
}
return producerBatchList
}
func (retryQueue *LogSendRetryQueue) Len() int {
return len(retryQueue.batch)
}
func (retryQueue *LogSendRetryQueue) Less(i, j int) bool {
return retryQueue.batch[i].nextRetryMs < retryQueue.batch[j].nextRetryMs
}
func (retryQueue *LogSendRetryQueue) Swap(i, j int) {
retryQueue.batch[i], retryQueue.batch[j] = retryQueue.batch[j], retryQueue.batch[i]
}
func (retryQueue *LogSendRetryQueue) Push(x interface{}) {
item := x.(*ProducerBatch)
retryQueue.batch = append(retryQueue.batch, item)
}
func (retryQueue *LogSendRetryQueue) Pop() interface{} {
old := retryQueue.batch
n := len(old)
item := old[n-1]
retryQueue.batch = old[0 : n-1]
return item
}