diff --git a/client/cherami/outputhostconnection.go b/client/cherami/outputhostconnection.go index 8a45c31..a9bcf12 100644 --- a/client/cherami/outputhostconnection.go +++ b/client/cherami/outputhostconnection.go @@ -56,7 +56,6 @@ type ( logger bark.Logger reporter metrics.Reporter - lk sync.Mutex opened int32 closed int32 wg sync.WaitGroup @@ -109,10 +108,8 @@ func newOutputHostConnection(ackClient cherami.TChanBOut, wsConnector WSConnecto } func (conn *outputHostConnection) open() error { - conn.lk.Lock() - defer conn.lk.Unlock() - if atomic.LoadInt32(&conn.opened) == 0 { + if atomic.CompareAndSwapInt32(&conn.opened, 0, 1) { switch conn.protocol { case cherami.Protocol_WS: conn.logger.Infof("Using websocket to connect to output host %s", conn.connKey) @@ -145,7 +142,6 @@ func (conn *outputHostConnection) open() error { conn.wg.Add(1) go conn.writeAcksPump() - atomic.StoreInt32(&conn.opened, 1) conn.logger.Info("Output host connection opened.") } @@ -153,8 +149,6 @@ func (conn *outputHostConnection) open() error { } func (conn *outputHostConnection) close() { - conn.lk.Lock() - defer conn.lk.Unlock() if atomic.CompareAndSwapInt32(&conn.closed, 0, 1) { select { @@ -165,7 +159,8 @@ func (conn *outputHostConnection) close() { close(conn.closeChannel) conn.closeAcksBatchCh() // necessary to shutdown writeAcksPump within the connection - conn.wg.Wait() // wait for the goroutines to finish up + + conn.wg.Wait() // wait for the goroutines to finish up conn.logger.Info("Output host connection closed.") } }