-
Notifications
You must be signed in to change notification settings - Fork 79
/
Copy pathtest.go
138 lines (118 loc) · 4 KB
/
test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package vicetest
import (
"fmt"
"sync"
"testing"
"time"
"github.com/matryer/is"
"github.com/matryer/vice/v2"
)
// Transport runs standard transport tests. All Transport types pass
// this test.
//
// For more information see https://github.com/matryer/vice/blob/master/docs/writing-transports.md
//
// Transports should be initialised with a clean state. Old persisted messages
// can interfere with the test.
// After the tests are run, the transport is closed (since that is part
// of the spec).
func Transport(t *testing.T, transport func() vice.Transport) {
t.Run("testStandardTransportBehaviour", func(t *testing.T) {
testStandardTransportBehaviour(t, transport)
})
t.Run("testSendChannelsDontBlock", func(t *testing.T) {
testSendChannelsDontBlock(t, transport)
})
}
// testSendChannelsDontBlock ensures that send channels don't block, even
// if nothing (we know of) is receiving them.
func testSendChannelsDontBlock(t *testing.T, newTransport func() vice.Transport) {
is := is.New(t)
transport := newTransport()
select {
case transport.Send("something") <- []byte("message"):
return
case <-time.After(1 * time.Second):
is.Fail() // send channels shouldn't block
}
}
// testStandardTransportBehaviour tests that transports load balance
// over many Receive channels.
func testStandardTransportBehaviour(t *testing.T, newTransport func() vice.Transport) {
is := is.New(t)
defer func() {
if r := recover(); r != nil {
is.Fail() // old messages may have confused test
}
}()
transport := newTransport()
transport1 := newTransport()
transport2 := newTransport()
doneChan := make(chan struct{})
messages := make(map[string][][]byte)
var wg sync.WaitGroup
go func() {
defer close(doneChan)
for {
select {
case <-transport.Done():
return
case err := <-transport.ErrChan():
is.NoErr(err)
// test local load balancing with the same transport
case msg := <-transport.Receive("vicechannel1"):
messages["vicechannel1"] = append(messages["vicechannel1"], msg)
wg.Done()
case msg := <-transport.Receive("vicechannel1"):
messages["vicechannel1"] = append(messages["vicechannel1"], msg)
wg.Done()
case msg := <-transport.Receive("vicechannel1"):
messages["vicechannel1"] = append(messages["vicechannel1"], msg)
wg.Done()
case msg := <-transport.Receive("vicechannel2"):
messages["vicechannel2"] = append(messages["vicechannel2"], msg)
wg.Done()
case msg := <-transport.Receive("vicechannel2"):
messages["vicechannel2"] = append(messages["vicechannel2"], msg)
wg.Done()
case msg := <-transport.Receive("vicechannel3"):
messages["vicechannel3"] = append(messages["vicechannel3"], msg)
wg.Done()
// test distibuted load balancing
case msg := <-transport.Receive("vicechannel4"):
messages["vicechannel4.1"] = append(messages["vicechannel4.1"], msg)
wg.Done()
case msg := <-transport1.Receive("vicechannel4"):
messages["vicechannel4.2"] = append(messages["vicechannel4.2"], msg)
wg.Done()
case msg := <-transport2.Receive("vicechannel4"):
messages["vicechannel4.3"] = append(messages["vicechannel4.3"], msg)
wg.Done()
}
}
}()
// Let's give some time to initialize all receiving channels
time.Sleep(time.Millisecond * 10)
// send 100 messages down each chan
for i := 0; i < 100; i++ {
wg.Add(4)
msg := []byte(fmt.Sprintf("message %d", i+1))
transport.Send("vicechannel1") <- msg
transport.Send("vicechannel2") <- msg
transport.Send("vicechannel3") <- msg
transport.Send("vicechannel4") <- msg
}
wg.Wait()
transport.Stop()
transport1.Stop()
transport2.Stop()
<-doneChan
is.Equal(len(messages), 6)
is.Equal(len(messages["vicechannel1"]), 100)
is.Equal(len(messages["vicechannel2"]), 100)
is.Equal(len(messages["vicechannel3"]), 100)
is.True(len(messages["vicechannel4.1"]) != 100)
is.True(len(messages["vicechannel4.2"]) != 100)
is.True(len(messages["vicechannel4.3"]) != 100)
is.Equal(len(messages["vicechannel4.1"])+len(messages["vicechannel4.2"])+len(messages["vicechannel4.3"]), 100)
}