Skip to content

Commit ab23f61

Browse files
committed
refactor to remove recursion
1 parent 6696213 commit ab23f61

File tree

1 file changed

+19
-20
lines changed

1 file changed

+19
-20
lines changed

rtq/rtq.go

+19-20
Original file line numberDiff line numberDiff line change
@@ -95,18 +95,32 @@ func NewQ(name string, cfg Config) (*rtQ, error) {
9595
statusError: cfg.Logger.Error,
9696
}
9797

98-
go rtq.tx() // start transmitting
99-
//rtq.QStats(b) // start status monitoring
98+
// start transmitting
99+
go func() {
100+
// endless loop
101+
for {
102+
rtq.tx()
103+
rtq.status("TransmissionStatus", zapcore.Field{
104+
Key: "WaitSecond",
105+
Type: zapcore.Int32Type,
106+
Integer: int64(rtq.cfg.Interval / time.Second),
107+
})
108+
109+
// wait for interval to pass
110+
<-time.After(rtq.cfg.Interval)
111+
}
112+
113+
}()
100114

101115
return rtq, nil
102116
}
103117

104118
// getMessageBatch starts at the first record and
105119
// builds a MessageBatch for each found key up to the
106120
// batch size.
107-
func (rt *rtQ) getMessageBatch() MessageBatch {
121+
func (rt *rtQ) getMessageBatch() *MessageBatch {
108122
uuidV4, _ := uuid.NewV4()
109-
mb := MessageBatch{
123+
mb := &MessageBatch{
110124
Uuid: uuidV4.String(),
111125
}
112126

@@ -172,7 +186,7 @@ func (rt *rtQ) getMessageBatch() MessageBatch {
172186
}
173187

174188
// transmit attempts to transmit a message batch
175-
func (rt *rtQ) transmit(msgB MessageBatch) error {
189+
func (rt *rtQ) transmit(msgB *MessageBatch) error {
176190

177191
jsonStr, err := json.Marshal(msgB)
178192
if err != nil {
@@ -223,7 +237,6 @@ func (rt *rtQ) tx() {
223237
if mb.Size < 1 {
224238
// nothing to send
225239
rt.status("TransmissionSkipEmpty")
226-
rt.waitTx()
227240
return
228241
}
229242

@@ -251,7 +264,6 @@ func (rt *rtQ) tx() {
251264
if rt.mCount > rt.cfg.MaxInQueue {
252265
rt.remove <- rt.mCount - rt.cfg.MaxInQueue
253266
}
254-
rt.waitTx()
255267
return
256268
}
257269

@@ -264,19 +276,6 @@ func (rt *rtQ) tx() {
264276
Integer: int64(mb.Size),
265277
})
266278
rt.remove <- mb.Size
267-
rt.waitTx()
268-
}
269-
270-
// waitTx sleeps for rt.cfg.Interval * time.Second then performs a tx.
271-
func (rt *rtQ) waitTx() {
272-
rt.status("TransmissionStatus", zapcore.Field{
273-
Key: "WaitSecond",
274-
Type: zapcore.Int32Type,
275-
Integer: int64(rt.cfg.Interval / time.Second),
276-
})
277-
278-
time.Sleep(rt.cfg.Interval)
279-
rt.tx() // recursion
280279
}
281280

282281
// Write to the queue

0 commit comments

Comments
 (0)