-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparallel.go
120 lines (98 loc) · 2.18 KB
/
parallel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// nolint: dupl
package execs
import (
"os"
"sync"
"time"
"gopkg.in/gomisc/errors.v1"
)
type parallelGroup struct {
members Members
pool map[string]Process
}
// NewParallel конструктор параллельного запуска раннеров группы
func NewParallel(members ...Member) Runner {
return ¶llelGroup{
members: members,
pool: make(map[string]Process),
}
}
// Run коллбэк запуска раннера группы
func (g parallelGroup) Run(signals <-chan os.Signal, ready chan<- struct{}) error {
exitTrace := make(ExitTrace, 0, len(g.members))
var (
wg sync.WaitGroup
traces = make(chan ExitEvent, len(g.members))
)
for m := 0; m < len(g.members); m++ {
wg.Add(1)
member := g.members[m]
p := Background(member)
g.pool[member.Name] = p
go func() {
Done:
for {
select {
case <-p.Ready():
break Done
case err := <-p.Wait():
exit := ExitEvent{Member: member}
if err != nil {
exit.Err = errors.Ctx().
Str("runner", member.Name).
Wrap(err, "exit runner status")
}
traces <- exit
break Done
}
}
wg.Done()
}()
}
wg.Wait()
close(traces)
close(ready)
for event := range traces {
exitTrace = append(exitTrace, event)
}
return g.wait(signals, exitTrace)
}
func (g *parallelGroup) wait(signals <-chan os.Signal, exitTrace ExitTrace) error {
var exitErr error
exited := map[string]struct{}{}
signal := <-signals
if len(exitTrace) > 0 {
for _, exitEvent := range exitTrace {
exited[exitEvent.Member.Name] = struct{}{}
if exitEvent.Err != nil {
exitErr = errors.And(exitErr, exitEvent.Err)
}
}
}
for m := len(g.members) - 1; m >= 0; m-- {
member := g.members[m]
if _, isExited := exited[member.Name]; isExited {
continue
}
if p, ok := g.pool[member.Name]; ok {
p.Signal(signal)
Exited:
for {
select {
case err := <-p.Wait():
if err != nil {
exitErr = errors.And(
exitErr,
errors.Ctx().
Str("runner", member.Name).
Wrap(err, "exit runner status"),
)
}
break Exited
case <-time.After(time.Millisecond * 100):
}
}
}
}
return exitErr
}