-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathring.go
76 lines (70 loc) · 1.64 KB
/
ring.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package zero
import (
"runtime"
"sync"
"sync/atomic"
"time"
"unsafe"
)
type eventRing struct {
sync.Mutex
c uintptr
r []*eventRingItem
i uintptr
p []eventRingItem
}
type eventRingItem struct {
response []byte
caller APICaller
}
func newring(ringLen uint) eventRing {
return eventRing{
r: make([]*eventRingItem, ringLen),
p: make([]eventRingItem, ringLen+1),
} // 同一节点, 每 ringLen*(ringLen+1) 轮将共用同一 buffer
}
// processEvent 同步向池中放入事件
//
//go:nosplit
func (evr *eventRing) processEvent(response []byte, caller APICaller) {
evr.Lock()
defer evr.Unlock()
r := evr.c % uintptr(len(evr.r))
p := evr.i % uintptr(len(evr.p))
evr.p[p] = eventRingItem{
response: response,
caller: caller,
}
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&evr.r[r])), unsafe.Pointer(&evr.p[p]))
evr.c++
evr.i++
}
// loop 循环处理事件
//
// latency 延迟 latency 再处理事件
func (evr *eventRing) loop(latency, maxwait time.Duration, process func([]byte, APICaller, time.Duration)) {
go func(r []*eventRingItem) {
c := uintptr(0)
if latency < time.Millisecond {
latency = time.Millisecond
}
totl := time.Duration(0)
for range time.NewTicker(latency).C {
i := c % uintptr(len(r))
it := (*eventRingItem)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&r[i]))))
if it == nil { // 还未有消息
continue
}
process(it.response, it.caller, maxwait)
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&r[i])), unsafe.Pointer(nil))
it.response = nil
it.caller = nil
c++
totl += latency
if totl > time.Second {
totl = 0
runtime.GC()
}
}
}(evr.r)
}