From 4fa1f1c7282e56dca4c0d7191867e1305e2ccbac Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 24 May 2022 05:31:09 -0700 Subject: [PATCH] Looks like this has a good chance of fixing the issue related to #85 --- channel.go | 21 ++++++++++++++------- integration_test.go | 2 -- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/channel.go b/channel.go index 5ad9546..1b34405 100644 --- a/channel.go +++ b/channel.go @@ -87,10 +87,17 @@ func newChannel(c *Connection, id uint16) *Channel { } } +// Signal that from now on, Channel.send() should call Channel.sendClosed() +func (ch *Channel) setClosed() { + atomic.StoreInt32(&ch.closed, 1) +} + // shutdown is called by Connection after the channel has been removed from the // connection registry. func (ch *Channel) shutdown(e *Error) { ch.destructor.Do(func() { + ch.setClosed() + ch.m.Lock() defer ch.m.Unlock() @@ -105,10 +112,6 @@ func (ch *Channel) shutdown(e *Error) { } } - // Signal that from now on, Channel.send() should call - // Channel.sendClosed() - atomic.StoreInt32(&ch.closed, 1) - // Notify RPC if we're selecting if e != nil { ch.errors <- e @@ -154,7 +157,7 @@ func (ch *Channel) shutdown(e *Error) { // only 'channel.close' is sent to the server. func (ch *Channel) send(msg message) (err error) { // If the channel is closed, use Channel.sendClosed() - if atomic.LoadInt32(&ch.closed) == 1 { + if ch.IsClosed() { return ch.sendClosed(msg) } @@ -231,7 +234,7 @@ func (ch *Channel) sendOpen(msg message) (err error) { } // If the channel is closed, use Channel.sendClosed() - if atomic.LoadInt32(&ch.closed) == 1 { + if ch.IsClosed() { return ch.sendClosed(msg) } @@ -266,7 +269,7 @@ func (ch *Channel) sendOpen(msg message) (err error) { } } else { // If the channel is closed, use Channel.sendClosed() - if atomic.LoadInt32(&ch.closed) == 1 { + if ch.IsClosed() { return ch.sendClosed(msg) } @@ -284,6 +287,10 @@ func (ch *Channel) sendOpen(msg message) (err error) { func (ch *Channel) dispatch(msg message) { switch m := msg.(type) { case *channelClose: + // Immediately indicate that this channel is closed to prevent + // invalid frames from being sent to the server + ch.setClosed() + // lock before sending connection.close-ok // to avoid unexpected interleaving with basic.publish frames if // publishing is happening concurrently diff --git a/integration_test.go b/integration_test.go index 3c1c854..ff7dcdc 100644 --- a/integration_test.go +++ b/integration_test.go @@ -1567,8 +1567,6 @@ func TestRepeatedChannelExceptionWithPublishAndMaxProcsIssue46(t *testing.T) { if j == 0 { t.Fatal("channel should not be closed") } - // TODO remove this debug log - t.Logf("channel is closed, i: %d j: %d", i, j) break } publishError := ch.Publish("not-existing-exchange", "some-key", false, false, Publishing{Body: []byte("some-data")})