-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathevent_channel.go
100 lines (94 loc) · 2.26 KB
/
event_channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package zero
// FutureEvent 是 ZeroBot 交互式的核心,用于异步获取指定事件
type FutureEvent struct {
Type string
Priority int
Rule []Rule
Block bool
}
// NewFutureEvent 创建一个FutureEvent, 并返回其指针
func NewFutureEvent(typ string, priority int, block bool, rule ...Rule) *FutureEvent {
return &FutureEvent{
Type: typ,
Priority: priority,
Rule: rule,
Block: block,
}
}
// FutureEvent 返回一个 FutureEvent 实例指针,用于获取满足 Rule 的 未来事件
//
// 此 FutureEvent 必然比 Matcher 之优先级少 1
func (m *Matcher) FutureEvent(typ string, rule ...Rule) *FutureEvent {
return &FutureEvent{
Type: typ,
Priority: m.Priority - 1,
Block: m.Block,
Rule: rule,
}
}
// Next 返回一个 chan 用于接收下一个指定事件
//
// 该 chan 必须接收,如需手动取消监听,请使用 Repeat 方法
func (n *FutureEvent) Next() <-chan *Ctx {
ch := make(chan *Ctx, 1)
StoreTempMatcher(&Matcher{
Type: Type(n.Type),
Block: n.Block,
Priority: n.Priority,
Rules: n.Rule,
Engine: defaultEngine,
Handler: func(ctx *Ctx) {
ch <- ctx
close(ch)
},
})
return ch
}
// Repeat 返回一个 chan 用于接收无穷个指定事件,和一个取消监听的函数
//
// 如果没有取消监听,将不断监听指定事件
func (n *FutureEvent) Repeat() (recv <-chan *Ctx, cancel func()) {
ch, done := make(chan *Ctx, 1), make(chan struct{})
go func() {
defer close(ch)
in := make(chan *Ctx, 1)
matcher := StoreMatcher(&Matcher{
Type: Type(n.Type),
Block: n.Block,
Priority: n.Priority,
Rules: n.Rule,
Engine: defaultEngine,
Handler: func(ctx *Ctx) {
in <- ctx
},
})
for {
select {
case e := <-in:
ch <- e
case <-done:
matcher.Delete()
close(in)
return
}
}
}()
return ch, func() {
close(done)
}
}
// Take 基于 Repeat 封装,返回一个 chan 接收指定数量的事件
//
// 该 chan 对象必须接收,否则将有 goroutine 泄漏,如需手动取消请使用 Repeat
func (n *FutureEvent) Take(num int) <-chan *Ctx {
recv, cancel := n.Repeat()
ch := make(chan *Ctx, num)
go func() {
defer close(ch)
for i := 0; i < num; i++ {
ch <- <-recv
}
cancel()
}()
return ch
}