From d7c51937574eac1da7a55d06781d88acdf29261e Mon Sep 17 00:00:00 2001 From: Aravind Srinivasan Date: Wed, 10 May 2017 09:28:39 -0700 Subject: [PATCH 1/2] Make sure to wait for DLQ routines to exit We need to make sure we actually wait for the DLQ routines to exit. --- services/outputhost/deadletterqueue.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/services/outputhost/deadletterqueue.go b/services/outputhost/deadletterqueue.go index a22bd96c..658942b8 100644 --- a/services/outputhost/deadletterqueue.go +++ b/services/outputhost/deadletterqueue.go @@ -22,6 +22,7 @@ package outputhost import ( "fmt" + "sync" "time" client "github.com/uber/cherami-client-go/client/cherami" @@ -81,6 +82,7 @@ type deadLetterQueue struct { closeCh chan struct{} consumerM3Client metrics.Client m3Client metrics.Client + dlqWG sync.WaitGroup } func newDeadLetterQueue(ctx thrift.Context, lclLg bark.Logger, cgDesc shared.ConsumerGroupDescription, metaClient metadata.TChanMetadataService, m3Client metrics.Client, consumerM3Client metrics.Client) (dlq *deadLetterQueue, err error) { @@ -94,6 +96,7 @@ func newDeadLetterQueue(ctx thrift.Context, lclLg bark.Logger, cgDesc shared.Con m3Client: m3Client, } + dlq.dlqWG.Add(2) go dlq.publishBuffer() go dlq.publish() @@ -139,6 +142,7 @@ func newDeadLetterQueue(ctx thrift.Context, lclLg bark.Logger, cgDesc shared.Con } func (dlq *deadLetterQueue) publishBuffer() { + defer dlq.dlqWG.Done() var buffer []*cherami.ConsumerMessage var peekVal *cherami.ConsumerMessage var dlqPublishBufferOutCh chan *cherami.ConsumerMessage @@ -187,6 +191,7 @@ func (dlq *deadLetterQueue) publishBuffer() { } func (dlq *deadLetterQueue) publish() { + defer dlq.dlqWG.Done() var err error for { select { @@ -256,5 +261,6 @@ func (dlq *deadLetterQueue) close() { dlq.publisher.Close() } close(dlq.closeCh) + dlq.dlqWG.Wait() } } From d492504d704a4ef5c03b89185d4ad12b246090a1 Mon Sep 17 00:00:00 2001 From: Aravind Srinivasan Date: Mon, 15 May 2017 09:01:48 -0700 Subject: [PATCH 2/2] Address CR comments --- services/outputhost/deadletterqueue.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/services/outputhost/deadletterqueue.go b/services/outputhost/deadletterqueue.go index 658942b8..a135fad8 100644 --- a/services/outputhost/deadletterqueue.go +++ b/services/outputhost/deadletterqueue.go @@ -82,7 +82,7 @@ type deadLetterQueue struct { closeCh chan struct{} consumerM3Client metrics.Client m3Client metrics.Client - dlqWG sync.WaitGroup + wg sync.WaitGroup } func newDeadLetterQueue(ctx thrift.Context, lclLg bark.Logger, cgDesc shared.ConsumerGroupDescription, metaClient metadata.TChanMetadataService, m3Client metrics.Client, consumerM3Client metrics.Client) (dlq *deadLetterQueue, err error) { @@ -96,7 +96,7 @@ func newDeadLetterQueue(ctx thrift.Context, lclLg bark.Logger, cgDesc shared.Con m3Client: m3Client, } - dlq.dlqWG.Add(2) + dlq.wg.Add(2) go dlq.publishBuffer() go dlq.publish() @@ -142,7 +142,7 @@ func newDeadLetterQueue(ctx thrift.Context, lclLg bark.Logger, cgDesc shared.Con } func (dlq *deadLetterQueue) publishBuffer() { - defer dlq.dlqWG.Done() + defer dlq.wg.Done() var buffer []*cherami.ConsumerMessage var peekVal *cherami.ConsumerMessage var dlqPublishBufferOutCh chan *cherami.ConsumerMessage @@ -191,7 +191,7 @@ func (dlq *deadLetterQueue) publishBuffer() { } func (dlq *deadLetterQueue) publish() { - defer dlq.dlqWG.Done() + defer dlq.wg.Done() var err error for { select { @@ -261,6 +261,6 @@ func (dlq *deadLetterQueue) close() { dlq.publisher.Close() } close(dlq.closeCh) - dlq.dlqWG.Wait() + dlq.wg.Wait() } }