-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathqueue.go
109 lines (97 loc) · 2.01 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package crawler
import (
"container/list"
"context"
"sync"
)
// Queue is used by workers to keep track of the urls that need to be fetched.
// Queue must be safe to use concurrently.
type Queue interface {
PushBack(*Request) error
PopFront() (*Request, error)
}
// InMemoryQueue holds a queue of items to be crawled in memory
type InMemoryQueue struct {
ctx context.Context
in chan *Request
out chan *Request
done chan struct{}
inFlight int64
mut sync.Mutex
}
// NewInMemoryQueue returns an in memory queue ready to be used by different workers
func NewInMemoryQueue(ctx context.Context) *InMemoryQueue {
q := &InMemoryQueue{
ctx: ctx,
in: make(chan *Request),
out: make(chan *Request),
done: make(chan struct{}),
}
go q.run()
return q
}
func (q *InMemoryQueue) run() {
queue := list.New()
for {
var (
out = q.out
next *Request
)
front := queue.Front()
if front == nil {
out = nil
} else {
next = front.Value.(*Request)
}
select {
case req := <-q.in:
queue.PushBack(req)
case out <- next:
queue.Remove(front)
case <-q.ctx.Done():
return
case <-q.done:
return
}
}
}
// PushBack adds a request to the queue
func (q *InMemoryQueue) PushBack(req *Request) error {
if req.finished {
panic("requeueing finished request is forbidden")
}
q.mut.Lock()
defer q.mut.Unlock()
req.onFinish = func() {
q.mut.Lock()
defer q.mut.Unlock()
q.inFlight--
if q.inFlight == 0 {
close(q.done)
}
}
select {
case <-q.ctx.Done():
return q.ctx.Err()
case <-q.done:
panic("cannot push after queue was exhausted")
case q.in <- req:
q.inFlight++
}
return nil
}
// PopFront gets the next request from the queue.
// It will return a nil request and a nil error if the queue is empty.
func (q *InMemoryQueue) PopFront() (*Request, error) {
select {
case req := <-q.out:
if req.finished {
panic("popped message had already been finished")
}
return req, nil
case <-q.done:
return nil, nil
case <-q.ctx.Done():
return nil, q.ctx.Err()
}
}