-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathemit.go
143 lines (122 loc) · 2.36 KB
/
emit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package emit
import (
"errors"
"sync"
"time"
)
type key[T any] struct{}
type Emitter struct {
subs map[any][]chan any
mu sync.RWMutex
}
func (e *Emitter) Close() {
e.mu.Lock()
defer e.mu.Unlock()
for _, chans := range e.subs {
for _, ch := range chans {
close(ch)
}
}
}
func (e *Emitter) init() {
if e.subs == nil {
e.subs = make(map[any][]chan any)
}
}
func Emit[T any](e *Emitter, v T) error {
if e == nil {
return errors.New("emitter is nil")
}
e.mu.RLock()
defer e.mu.RUnlock()
subs, ok := e.subs[key[T]{}]
if ok {
for _, cn := range subs {
select {
case cn <- v:
// message sent
default:
// message dropped
}
}
}
return nil
}
type OnOptions struct {
timeout time.Duration
batchSize int
}
func WithTimeout(maxTimeout time.Duration) func(*OnOptions) {
return func(s *OnOptions) {
s.timeout = maxTimeout
}
}
func WithBatchSize(maxBatchSize int) func(*OnOptions) {
return func(s *OnOptions) {
s.batchSize = maxBatchSize
}
}
func On[T any](e *Emitter, fn func([]T), options ...func(*OnOptions)) {
opts := &OnOptions{}
for _, o := range options {
o(opts)
}
e.mu.Lock()
defer e.mu.Unlock()
e.init()
if _, ok := e.subs[key[T]{}]; !ok {
e.subs[key[T]{}] = make([]chan any, 0, 3) // default capacity of 3
}
ch := make(chan any)
e.subs[key[T]{}] = append(e.subs[key[T]{}], ch)
go collect(ch, opts.batchSize, opts.timeout, fn)
}
// collect reads from a channel and calls fn when batchSize is met or timeout is triggered
func collect[T any](ch <-chan any, batchSize int, timeout time.Duration, fn func([]T)) {
var ticker *time.Ticker
if timeout > 0 {
ticker = time.NewTicker(timeout)
} else {
ticker = new(time.Ticker)
}
defer ticker.Stop()
for batchSize <= 1 { // sanity check,
for v := range ch {
if v, ok := v.(T); ok {
fn([]T{v})
}
}
return
}
// batchSize > 1
batch := make([]T, 0, batchSize)
for {
select {
case <-ticker.C:
if len(batch) > 0 {
fn(batch)
// reset
batch = make([]T, 0, batchSize)
if timeout > 0 {
ticker.Reset(timeout)
}
}
case v, ok := <-ch:
if !ok { // closed
return
}
if v, ok := v.(T); ok {
batch = append(batch, v)
}
if len(batch) == batchSize { // full
if len(batch) > 0 {
fn(batch)
batch = make([]T, 0, batchSize)
if timeout > 0 {
ticker.Reset(timeout)
}
}
}
}
}
}