forked from trustmaster/goflow
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcomponent_test.go
395 lines (350 loc) · 7.2 KB
/
component_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
package flow
import (
"sync"
"testing"
"time"
)
// A component that doubles its int input
type doubler struct {
Component
In <-chan int
Out chan<- int
}
// Doubles the input and sends it to output
func (d *doubler) OnIn(i int) {
d.Out <- i * 2
}
// A constructor that can be used by component registry/factory
func newDoubler() interface{} {
return new(doubler)
}
func init() {
Register("doubler", newDoubler)
}
// Tests a component with single input and single output
func TestSingleInput(t *testing.T) {
d := new(doubler)
in := make(chan int, 10)
out := make(chan int, 10)
d.In = in
d.Out = out
RunProc(d)
for i := 0; i < 10; i++ {
in <- i
i2 := <-out
ix2 := i * 2
if i2 != ix2 {
t.Errorf("%d != %d", i2, ix2)
}
}
// Shutdown the component
close(in)
}
// A component that locks to preserve concurrent modification of its state
type locker struct {
Component
In <-chan int
Out chan<- int
StateLock *sync.Mutex
counter int
sum int
}
// Creates a locker instance. This is required because StateLock must be a pointer
func newLocker() *locker {
l := new(locker)
l.counter = 0
l.sum = 0
l.StateLock = new(sync.Mutex)
return l
}
// A constructor that can be used by component registry/factory
func newLockerConstructor() interface{} {
return newLocker()
}
func init() {
Register("locker", newLockerConstructor)
}
// Simulates long processing and read/write access
func (l *locker) OnIn(i int) {
l.counter++
// Half of the calls will wait to simulate long processing
if l.counter%2 == 0 {
time.Sleep(1000)
}
// Parellel write data race danger is here
l.sum += i
}
func (l *locker) Shutdown() {
// Emit the result and don't close the outport
l.Out <- l.sum
}
// Tests internal state locking feature.
// Run with GOMAXPROCS > 1.
func TestStateLock(t *testing.T) {
l := newLocker()
in := make(chan int, 10)
out := make(chan int, 10)
l.In = in
l.Out = out
RunProc(l)
// Simulate parallel writing and count the sum
sum := 0
for i := 1; i <= 1000; i++ {
in <- i
sum += i
}
// Send the close signal
close(in)
// Get the result and check if it is consistent
sum2 := <-out
if sum2 != sum {
t.Errorf("%d != %d", sum2, sum)
}
}
// Similar to locker, but intended to test ComponentModeSync
type syncLocker struct {
Component
In <-chan int
Out chan<- int
counter int
sum int
}
// Creates a syncLocker instance
func newSyncLocker() *syncLocker {
l := new(syncLocker)
l.counter = 0
l.sum = 0
l.Component.Mode = ComponentModeSync // Change this to ComponentModeAsync and the test will fail
return l
}
// A constructor that can be used by component registry/factory
func newSyncLockerConstructor() interface{} {
return newSyncLocker()
}
func init() {
Register("syncLocker", newSyncLockerConstructor)
}
// Simulates long processing and read/write access
func (l *syncLocker) OnIn(i int) {
l.counter++
// Half of the calls will wait to simulate long processing
if l.counter%2 == 0 {
time.Sleep(1000)
}
// Parellel write data race danger is here
l.sum += i
}
func (l *syncLocker) Shutdown() {
// Emit the result and don't close the outport
l.Out <- l.sum
}
// Tests synchronous process execution feature.
// Run with GOMAXPROCS > 1.
func TestSyncLock(t *testing.T) {
l := newSyncLocker()
in := make(chan int, 10)
out := make(chan int, 10)
l.In = in
l.Out = out
RunProc(l)
// Simulate parallel writing and count the sum
sum := 0
for i := 1; i <= 1000; i++ {
in <- i
sum += i
}
// Send the close signal
close(in)
// Get the result and check if it is consistent
sum2 := <-out
if sum2 != sum {
t.Errorf("%d != %d", sum2, sum)
}
}
// An external variable
var testInitFinFlag int
// Simple component
type initfin struct {
Component
In <-chan int
Out chan<- int
}
// Echo input
func (i *initfin) OnIn(n int) {
// Dependent behavior
if testInitFinFlag == 123 {
i.Out <- n * 2
} else {
i.Out <- n
}
}
// Initialization code, affects a global var
func (i *initfin) Init() {
testInitFinFlag = 123
}
// Finalization code
func (i *initfin) Finish() {
testInitFinFlag = 456
}
// Tests user initialization and finalization functions
func TestInitFinish(t *testing.T) {
// Create and run the component
i := new(initfin)
i.Net = new(Graph)
i.Net.InitGraphState()
i.Net.waitGrp.Add(1)
in := make(chan int)
out := make(chan int)
i.In = in
i.Out = out
RunProc(i)
// Pass a value, the result must be affected by flag state
in <- 2
n2 := <-out
if n2 != 4 {
t.Errorf("%d != %d", n2, 4)
}
// Shut the component down and wait for Finish() code
close(in)
i.Net.waitGrp.Wait()
if testInitFinFlag != 456 {
t.Errorf("%d != %d", testInitFinFlag, 456)
}
}
// A flag to test OnClose
var closeTestFlag int
// A component to test OnClose handlers
type closeTest struct {
Component
In <-chan int
}
// In channel close event handler
func (c *closeTest) OnInClose() {
closeTestFlag = 789
}
// Tests close handler of input ports
func TestClose(t *testing.T) {
c := new(closeTest)
c.Net = new(Graph)
c.Net.InitGraphState()
c.Net.waitGrp.Add(1)
in := make(chan int)
c.In = in
RunProc(c)
in <- 1
close(in)
c.Net.waitGrp.Wait()
if closeTestFlag != 789 {
t.Errorf("%d != %d", closeTestFlag, 789)
}
}
// A flag to test OnClose
var shutdownTestFlag int
// A component to test OnClose handlers
type shutdownTest struct {
Component
In <-chan int
}
// In channel close event handler
func (s *shutdownTest) OnIn(i int) {
shutdownTestFlag = i
}
// Custom shutdown handler
func (s *shutdownTest) Shutdown() {
shutdownTestFlag = 789
}
// Tests close handler of input ports
func TestShutdown(t *testing.T) {
s := new(shutdownTest)
s.Net = new(Graph)
s.Net.InitGraphState()
s.Net.waitGrp.Add(1)
in := make(chan int)
s.In = in
RunProc(s)
in <- 1
close(in)
s.Net.waitGrp.Wait()
if shutdownTestFlag != 789 {
t.Errorf("%d != %d", shutdownTestFlag, 789)
}
}
func TestPoolMode(t *testing.T) {
d := new(doubler)
d.Component.Mode = ComponentModePool
d.Component.PoolSize = 4
in := make(chan int, 20)
out := make(chan int, 20)
d.In = in
d.Out = out
RunProc(d)
for i := 0; i < 10; i++ {
in <- i
}
for i := 0; i < 10; i++ {
i2 := <-out
if i2 < 0 {
t.Errorf("%d < 0", i2)
}
}
// Shutdown the component
close(in)
}
// A component to test manual termination
type stopMe struct {
Component
In <-chan int
Out chan<- int
}
func (s *stopMe) OnIn(i int) {
s.Out <- i * 2
}
func (s *stopMe) Finish() {
s.Out <- 909
}
// Tests manual termination via StopProc()
func TestStopProc(t *testing.T) {
s := new(stopMe)
in := make(chan int, 20)
out := make(chan int, 20)
s.In = in
s.Out = out
// Test normal mode first
RunProc(s)
for i := 0; i < 10; i++ {
in <- i
}
for i := 0; i < 10; i++ {
i2 := <-out
if i2 < 0 {
t.Errorf("%d < 0", i2)
}
}
// Stop without closing chans
StopProc(s)
// Wait for finish signal
fin := <-out
if fin != 909 {
t.Errorf("Invalid final signal: %d", fin)
}
// Run again in Pool mode
s.Component.Mode = ComponentModePool
s.Component.PoolSize = 4
RunProc(s)
for i := 0; i < 10; i++ {
in <- i
}
for i := 0; i < 10; i++ {
i2 := <-out
if i2 < 0 {
t.Errorf("%d < 0", i2)
}
}
// Stop without closing chans
StopProc(s)
// Wait for finish signal
fin = <-out
if fin != 909 {
t.Errorf("Invalid final signal: %d", fin)
}
}