-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathworkerpoolxt.go
100 lines (87 loc) · 1.98 KB
/
workerpoolxt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package workerpoolxt
import (
"context"
"sync"
"time"
"github.com/gammazero/workerpool"
)
// New creates WorkerPoolXT
func New(ctx context.Context, maxWorkers int) *WorkerPoolXT {
p := &WorkerPoolXT{
WorkerPool: workerpool.New(maxWorkers),
context: ctx,
result: make(chan Result),
kill: make(chan struct{}),
}
go p.processResults()
return p
}
// NewWithOptions creates a new WorkerPool with options
func NewWithOptions(ctx context.Context, maxWorkers int, o Options) *WorkerPoolXT {
p := New(ctx, maxWorkers)
p.options = o
return p
}
// WorkerPoolXT extends `github.com/gammazero/workerpool`
type WorkerPoolXT struct {
*workerpool.WorkerPool
context context.Context
kill chan struct{}
options Options
once sync.Once
result chan Result
results []Result
}
// SubmitXT submits a job which you can get a result from
func (p *WorkerPoolXT) SubmitXT(j Job) {
p.Submit(p.wrap(&j))
}
// StopWaitXT gets results then kills the worker pool
func (p *WorkerPoolXT) StopWaitXT() (rs []Result) {
p.stop(false)
return p.results
}
// processResults listens for results on resultsChan
func (p *WorkerPoolXT) processResults() {
for {
select {
case result, ok := <-p.result:
if !ok {
goto Done
}
p.results = append(p.results, result)
}
}
Done:
<-p.kill
}
// stop either stops the worker pool now or later
func (p *WorkerPoolXT) stop(now bool) {
p.once.Do(func() {
if now {
p.Stop()
} else {
p.StopWait()
}
close(p.result)
p.kill <- struct{}{}
})
}
// wrap generates the func that we pass to Submit.
func (p *WorkerPoolXT) wrap(j *Job) func() {
// This is the func we ultimately pass to `workerpool`
return func() {
// Allow job options to override default pool options
if j.Options == nil {
j.Options = p.options
}
if j.Context == nil {
j.Context = p.context
}
j.childCtx, j.done = context.WithCancel(j.Context)
j.result = make(chan Result)
j.startedAt = time.Now()
go j.runDone()
p.result <- j.getResult()
}
}