Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 71d50f1

Browse files
committedAug 9, 2024·
finish
1 parent 3b45fa9 commit 71d50f1

10 files changed

+846
-264
lines changed
 

‎pkg/apperror/errors.go

+3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const (
1818
ErrorTypeBufferFull ErrorType = 104
1919
ErrorTypeDuplicate ErrorType = 105
2020
ErrorTypeNotExist ErrorType = 106
21+
ErrorTypeClosed ErrorType = 107
2122

2223
ErrorTypeConnectionFailed ErrorType = 201
2324
ErrorTypeConnectionNotFound ErrorType = 202
@@ -50,6 +51,8 @@ func (t ErrorType) String() string {
5051
return "Duplicate"
5152
case ErrorTypeNotExist:
5253
return "NotExist"
54+
case ErrorTypeClosed:
55+
return "Closed"
5356
case ErrorTypeConnectionFailed:
5457
return "ConnectionFailed"
5558
case ErrorTypeConnectionNotFound:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package dynstream
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"sync/atomic"
7+
"testing"
8+
)
9+
10+
func runDynamicStream(pathCount int, eventCount int, times int) {
11+
handler := &incHandler{}
12+
13+
ds := NewDynamicStreamDefault(handler)
14+
ds.Start()
15+
16+
for i := 0; i < pathCount; i++ {
17+
ds.AddPath(PathAndDest[D]{Path: Path(fmt.Sprintf("p%d", i)), Dest: D{}})
18+
}
19+
20+
wg := &sync.WaitGroup{}
21+
wg.Add(eventCount * pathCount)
22+
23+
sendEvents := func(path Path, wg *sync.WaitGroup) {
24+
total := &atomic.Int64{}
25+
for i := 0; i < eventCount; i++ {
26+
ds.In() <- &inc{times: times, n: total, done: wg, path: path}
27+
}
28+
}
29+
30+
for i := 0; i < pathCount; i++ {
31+
sendEvents(Path(fmt.Sprintf("p%d", i)), wg)
32+
}
33+
34+
wg.Wait()
35+
ds.Close()
36+
}
37+
38+
func runGoroutine(pathCount int, eventCount int, times int) {
39+
handler := &incHandler{}
40+
chans := make([]chan *inc, pathCount)
41+
for i := 0; i < pathCount; i++ {
42+
chans[i] = make(chan *inc, eventCount)
43+
}
44+
45+
wg := &sync.WaitGroup{}
46+
wg.Add(eventCount * pathCount)
47+
48+
sendEvents := func(ch chan *inc, wg *sync.WaitGroup) {
49+
total := &atomic.Int64{}
50+
for i := 0; i < eventCount; i++ {
51+
ch <- &inc{times: times, n: total, done: wg}
52+
}
53+
}
54+
55+
for i := 0; i < pathCount; i++ {
56+
go sendEvents(chans[i], wg)
57+
}
58+
59+
for i := 0; i < pathCount; i++ {
60+
go func(ch chan *inc) {
61+
for e := range ch {
62+
handler.Handle(e, D{})
63+
}
64+
}(chans[i])
65+
}
66+
67+
wg.Wait()
68+
for i := 0; i < pathCount; i++ {
69+
close(chans[i])
70+
}
71+
}
72+
73+
func BenchmarkDSDynamicStream1000x1000x100(b *testing.B) {
74+
for k := 0; k < b.N; k++ {
75+
runDynamicStream(1000, 1000, 100)
76+
}
77+
}
78+
79+
func BenchmarkDSDynamicStream1000000x1000x100(b *testing.B) {
80+
for k := 0; k < b.N; k++ {
81+
runDynamicStream(1000000, 100, 100)
82+
}
83+
}
84+
85+
func BenchmarkDSGroutine1000x1000x100(b *testing.B) {
86+
for k := 0; k < b.N; k++ {
87+
runGoroutine(1000, 1000, 100)
88+
}
89+
}
90+
91+
func BenchmarkDSGroutine1000000x1000x100(b *testing.B) {
92+
for k := 0; k < b.N; k++ {
93+
runGoroutine(1000000, 100, 100)
94+
}
95+
}

0 commit comments

Comments
 (0)