Skip to content

Commit

Permalink
Consumer: reduce ticker allocations
Browse files Browse the repository at this point in the history
  • Loading branch information
faceair committed Feb 4, 2018
1 parent 0f4f8ca commit 733cd80
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,20 +441,20 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {

func (child *partitionConsumer) responseFeeder() {
var msgs []*ConsumerMessage
msgSent := false
var firstAttempt bool
expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)

feederLoop:
for response := range child.feeder {
msgs, child.responseResult = child.parseResponse(response)
expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)

for i, msg := range msgs {
messageSelect:
select {
case child.messages <- msg:
msgSent = true
firstAttempt = true
case <-expiryTicker.C:
if !msgSent {
if !firstAttempt {
child.responseResult = errTimedOut
child.broker.acks.Done()
for _, msg = range msgs[i:] {
Expand All @@ -465,16 +465,16 @@ feederLoop:
} else {
// current message has not been sent, return to select
// statement
msgSent = false
firstAttempt = false
goto messageSelect
}
}
}

expiryTicker.Stop()
child.broker.acks.Done()
}

expiryTicker.Stop()
close(child.messages)
close(child.errors)
}
Expand Down

0 comments on commit 733cd80

Please sign in to comment.