@@ -62,67 +62,15 @@ func NewQ(name string, cfg Config) (*rtQ, error) {
62
62
return nil , err
63
63
}
64
64
65
- // make our message queue bucket
66
- err = db .Update (func (tx * bolt.Tx ) error {
67
- _ , err := tx .CreateBucketIfNotExists ([]byte ("mq" ))
68
- if err != nil {
69
- return fmt .Errorf ("create bucket: %s" , err )
70
- }
71
- return nil
72
- })
65
+ err = ensureMqBucket (db )
73
66
if err != nil {
74
67
return nil , err
75
68
}
76
69
77
70
mq := make (chan Message , 0 )
78
71
remove := make (chan int , 0 )
79
72
80
- go func () {
81
- // begin kv writer
82
- for {
83
- select {
84
- case msg := <- mq :
85
- db .Update (func (tx * bolt.Tx ) error {
86
- uuidV4 , _ := uuid .NewV4 ()
87
-
88
- msg .Time = time .Now ()
89
- msg .Uuid = uuidV4 .String ()
90
-
91
- b := tx .Bucket ([]byte ("mq" ))
92
- id , _ := b .NextSequence ()
93
-
94
- msg .Seq = fmt .Sprintf ("%d%d%d%012d" , msg .Time .Year (), msg .Time .Month (), msg .Time .Day (), id )
95
-
96
- buf , err := json .Marshal (msg )
97
- if err != nil {
98
- return err
99
- }
100
- b .Put ([]byte (msg .Seq ), buf )
101
-
102
- return nil
103
- })
104
- case rmi := <- remove :
105
- db .Update (func (tx * bolt.Tx ) error {
106
- bucket := tx .Bucket ([]byte ("mq" ))
107
-
108
- c := bucket .Cursor ()
109
-
110
- // get the first rt.cfg.Batch
111
- i := 1
112
- for k , _ := c .First (); k != nil ; k , _ = c .Next () {
113
- c .Delete ()
114
- i ++
115
- if i > rmi {
116
- break
117
- }
118
- }
119
-
120
- return nil
121
- })
122
-
123
- }
124
- }
125
- }()
73
+ go messageHandler (db , mq , remove )
126
74
127
75
rtq := & rtQ {
128
76
db : db , // database
@@ -266,3 +214,66 @@ func (rt *rtQ) QWrite(msg Message) error {
266
214
267
215
return nil
268
216
}
217
+
218
+ // messageHandler listens to the mq and remove channels to add and
219
+ // remove messages
220
+ func messageHandler (db * bolt.DB , mq chan Message , remove chan int ) {
221
+ // begin kv writer
222
+ for {
223
+ select {
224
+ case msg := <- mq :
225
+ db .Update (func (tx * bolt.Tx ) error {
226
+ uuidV4 , _ := uuid .NewV4 ()
227
+
228
+ msg .Time = time .Now ()
229
+ msg .Uuid = uuidV4 .String ()
230
+
231
+ b := tx .Bucket ([]byte ("mq" ))
232
+ id , _ := b .NextSequence ()
233
+
234
+ msg .Seq = fmt .Sprintf ("%d%d%d%012d" , msg .Time .Year (), msg .Time .Month (), msg .Time .Day (), id )
235
+
236
+ buf , err := json .Marshal (msg )
237
+ if err != nil {
238
+ return err
239
+ }
240
+ b .Put ([]byte (msg .Seq ), buf )
241
+
242
+ return nil
243
+ })
244
+ case rmi := <- remove :
245
+ db .Update (func (tx * bolt.Tx ) error {
246
+ bucket := tx .Bucket ([]byte ("mq" ))
247
+
248
+ c := bucket .Cursor ()
249
+
250
+ // get the first rt.cfg.Batch
251
+ i := 1
252
+ for k , _ := c .First (); k != nil ; k , _ = c .Next () {
253
+ c .Delete ()
254
+ i ++
255
+ if i > rmi {
256
+ break
257
+ }
258
+ }
259
+
260
+ return nil
261
+ })
262
+
263
+ }
264
+ }
265
+ }
266
+
267
+ // ensureMqBocket makes a bucket for the message queue
268
+ func ensureMqBucket (db * bolt.DB ) error {
269
+ // make our message queue bucket
270
+ err := db .Update (func (tx * bolt.Tx ) error {
271
+ _ , err := tx .CreateBucketIfNotExists ([]byte ("mq" ))
272
+ if err != nil {
273
+ return fmt .Errorf ("create bucket: %s" , err )
274
+ }
275
+ return nil
276
+ })
277
+
278
+ return err
279
+ }
0 commit comments