diff --git a/services/outputhost/deadletterqueue.go b/services/outputhost/deadletterqueue.go index a22bd96c..658942b8 100644 --- a/services/outputhost/deadletterqueue.go +++ b/services/outputhost/deadletterqueue.go @@ -22,6 +22,7 @@ package outputhost import ( "fmt" + "sync" "time" client "github.com/uber/cherami-client-go/client/cherami" @@ -81,6 +82,7 @@ type deadLetterQueue struct { closeCh chan struct{} consumerM3Client metrics.Client m3Client metrics.Client + dlqWG sync.WaitGroup } func newDeadLetterQueue(ctx thrift.Context, lclLg bark.Logger, cgDesc shared.ConsumerGroupDescription, metaClient metadata.TChanMetadataService, m3Client metrics.Client, consumerM3Client metrics.Client) (dlq *deadLetterQueue, err error) { @@ -94,6 +96,7 @@ func newDeadLetterQueue(ctx thrift.Context, lclLg bark.Logger, cgDesc shared.Con m3Client: m3Client, } + dlq.dlqWG.Add(2) go dlq.publishBuffer() go dlq.publish() @@ -139,6 +142,7 @@ func newDeadLetterQueue(ctx thrift.Context, lclLg bark.Logger, cgDesc shared.Con } func (dlq *deadLetterQueue) publishBuffer() { + defer dlq.dlqWG.Done() var buffer []*cherami.ConsumerMessage var peekVal *cherami.ConsumerMessage var dlqPublishBufferOutCh chan *cherami.ConsumerMessage @@ -187,6 +191,7 @@ func (dlq *deadLetterQueue) publishBuffer() { } func (dlq *deadLetterQueue) publish() { + defer dlq.dlqWG.Done() var err error for { select { @@ -256,5 +261,6 @@ func (dlq *deadLetterQueue) close() { dlq.publisher.Close() } close(dlq.closeCh) + dlq.dlqWG.Wait() } }