-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconcurrent_scheduler.go
68 lines (57 loc) · 1.43 KB
/
concurrent_scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package edgerunner
import (
"sync"
)
type ConcurrentScheduler struct {
reader SignalReader
factory TaskFactory
waiter *sync.WaitGroup
startup chan error
shutdown chan struct{}
head *SignalingTask
err error
}
func NewConcurrentScheduler(reader SignalReader, factory TaskFactory) *ConcurrentScheduler {
return &ConcurrentScheduler{
reader: reader,
factory: factory,
waiter: &sync.WaitGroup{},
startup: make(chan error, 2),
shutdown: make(chan struct{}, 2),
}
}
func (this *ConcurrentScheduler) Schedule() error {
go this.scheduleTasks()
<-this.shutdown
this.head.Close()
this.waiter.Wait()
close(this.shutdown)
close(this.startup)
return this.err
}
func (this *ConcurrentScheduler) scheduleTasks() {
for this.scheduleNextTask() {
}
this.shutdown <- struct{}{} // signal the shutdown channel that we're ready to close
}
func (this *ConcurrentScheduler) scheduleNextTask() bool {
this.waiter.Add(1)
proposed, previous := NewSignalingTask(this.factory(), this.startup, this.shutdown), this.head
go this.runTask(proposed, previous)
if this.err = <-this.startup; this.err != nil {
proposed.Close()
} else {
this.head = proposed
}
return this.reader.Read() // wait for some kind of signal
}
func (this *ConcurrentScheduler) runTask(proposed, previous *SignalingTask) {
defer this.waiter.Done()
if proposed.Init() != nil {
return
}
if previous != nil {
previous.Close()
}
proposed.Listen()
}