-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathstream.go
232 lines (191 loc) · 4.26 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
package lungo
import (
"context"
"errors"
"io"
"sync"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"github.com/256dpi/lungo/bsonkit"
)
// ErrLostOplogPosition may be returned by a stream when the oplog position has
// been lost. This can happen if a consumer is slower than the expiration of
// oplog entries.
var ErrLostOplogPosition = errors.New("lost oplog position")
// Stream provides a mongo compatible way to read oplog events.
type Stream struct {
handle Handle
last bsonkit.Doc
pipeline bsonkit.List
signal chan struct{}
oplog func() *bsonkit.Set
cancel func()
event bsonkit.Doc
token interface{}
dropped bool
closed bool
error error
mutex sync.Mutex
}
// Close implements the IChangeStream.Close method.
func (s *Stream) Close(context.Context) error {
// acquire mutex
s.mutex.Lock()
defer s.mutex.Unlock()
// close stream
s.cancel()
s.event = nil
s.closed = true
s.error = nil
return nil
}
// Decode implements the IChangeStream.Decode method.
func (s *Stream) Decode(out interface{}) error {
// acquire mutex
s.mutex.Lock()
defer s.mutex.Unlock()
// check validity
if s.event == nil {
if s.closed {
return mongo.ErrNilCursor
}
return io.EOF
}
// decode event
err := bsonkit.Decode(s.event, out)
if err != nil {
return err
}
return nil
}
// Err implements the IChangeStream.Err method.
func (s *Stream) Err() error {
// acquire mutex
s.mutex.Lock()
defer s.mutex.Unlock()
return s.error
}
// ID implements the IChangeStream.ID method.
func (s *Stream) ID() int64 {
// acquire mutex
s.mutex.Lock()
defer s.mutex.Unlock()
return 0
}
// Next implements the IChangeStream.Next method.
func (s *Stream) Next(ctx context.Context) bool {
return s.next(ctx, true)
}
// ResumeToken implements the IChangeStream.ResumeToken method.
func (s *Stream) ResumeToken() bson.Raw {
// acquire mutex
s.mutex.Lock()
defer s.mutex.Unlock()
// check token
if s.token == nil {
return nil
}
// encode token
bytes, _ := bson.Marshal(s.token)
return bytes
}
// SetBatchSize implements the IChangeStream.SetBatchSize method.
func (s *Stream) SetBatchSize(int32) {}
// TryNext implements the IChangeStream.TryNext method.
func (s *Stream) TryNext(ctx context.Context) bool {
return s.next(ctx, false)
}
func (s *Stream) next(ctx context.Context, block bool) bool {
// acquire mutex
s.mutex.Lock()
defer s.mutex.Unlock()
// check validity
if s.error != nil || s.closed {
return false
}
// check if dropped
if s.dropped {
s.event = bsonkit.MustConvert(bson.M{
"_id": bson.M{"ts": "drop"},
"operationType": "invalidate",
"clusterTime": bsonkit.Now(),
})
s.token = bsonkit.Get(s.event, "_id")
s.cancel()
s.closed = true
return true
}
// ensure context
ctx = ensureContext(ctx)
for {
// get oplog
oplog := s.oplog()
// get index
index := -1
if s.last != nil {
i, ok := oplog.Index[s.last]
if !ok {
s.cancel()
s.closed = true
s.error = ErrLostOplogPosition
return false
}
index = i
}
// get next event
if len(oplog.List) > index+1 {
// get event
event := oplog.List[index+1]
// get details
token := bsonkit.Get(event, "_id")
nsDB := bsonkit.Get(event, "ns.db")
nsColl := bsonkit.Get(event, "ns.coll")
opType := bsonkit.Get(event, "operationType")
// match database and collection
if s.handle[0] != "" && s.handle[0] != nsDB {
s.last = event
continue
} else if s.handle[1] != "" && s.handle[1] != nsColl {
s.last = event
continue
}
// check drop and drop database
if s.handle[0] != "" && s.handle[1] != "" && opType == "drop" {
s.dropped = true
} else if s.handle[0] != "" && opType == "dropDatabase" {
s.dropped = true
}
// TODO: Filter with pipeline.
// set event and token
s.last = event
s.event = event
s.token = token
return true
}
// handle non blocking
if !block {
select {
case <-ctx.Done():
// set error
s.error = ctx.Err()
return false
default:
return false
}
}
// await next event
select {
case _, ok := <-s.signal:
if !ok {
// close stream
s.cancel()
s.closed = true
return false
}
case <-ctx.Done():
// set error
s.error = ctx.Err()
return false
}
}
}