Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Address CR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Aravind Srinivasan committed May 15, 2017
1 parent 436fdfe commit d492504
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions services/outputhost/deadletterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type deadLetterQueue struct {
closeCh chan struct{}
consumerM3Client metrics.Client
m3Client metrics.Client
dlqWG sync.WaitGroup
wg sync.WaitGroup
}

func newDeadLetterQueue(ctx thrift.Context, lclLg bark.Logger, cgDesc shared.ConsumerGroupDescription, metaClient metadata.TChanMetadataService, m3Client metrics.Client, consumerM3Client metrics.Client) (dlq *deadLetterQueue, err error) {
Expand All @@ -96,7 +96,7 @@ func newDeadLetterQueue(ctx thrift.Context, lclLg bark.Logger, cgDesc shared.Con
m3Client: m3Client,
}

dlq.dlqWG.Add(2)
dlq.wg.Add(2)
go dlq.publishBuffer()
go dlq.publish()

Expand Down Expand Up @@ -142,7 +142,7 @@ func newDeadLetterQueue(ctx thrift.Context, lclLg bark.Logger, cgDesc shared.Con
}

func (dlq *deadLetterQueue) publishBuffer() {
defer dlq.dlqWG.Done()
defer dlq.wg.Done()
var buffer []*cherami.ConsumerMessage
var peekVal *cherami.ConsumerMessage
var dlqPublishBufferOutCh chan *cherami.ConsumerMessage
Expand Down Expand Up @@ -191,7 +191,7 @@ func (dlq *deadLetterQueue) publishBuffer() {
}

func (dlq *deadLetterQueue) publish() {
defer dlq.dlqWG.Done()
defer dlq.wg.Done()
var err error
for {
select {
Expand Down Expand Up @@ -261,6 +261,6 @@ func (dlq *deadLetterQueue) close() {
dlq.publisher.Close()
}
close(dlq.closeCh)
dlq.dlqWG.Wait()
dlq.wg.Wait()
}
}

0 comments on commit d492504

Please sign in to comment.