From 8b6de9acfd4a54e690a18a94b5f08478517d65ad Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 26 May 2022 15:39:21 -0700 Subject: [PATCH] Revert test to demonstrate actual bug (#87) * Revert test to demonstrate actual bug Follow-up to #85 When this test fails, RabbitMQ logs the following connection exception: ``` 2022-05-24 11:00:12.747989+00:00 [error] <0.19502.2> Channel error on connection <0.19347.2> (172.17.0.1:46318 -> 172.17.0.2:5672, vhost: '/', user: 'guest'), channel 20: 2022-05-24 11:00:12.747989+00:00 [error] <0.19502.2> operation basic.publish caused a channel exception not_found: no exchange 'not-existing-exchange' in vhost '/' 2022-05-24 11:00:12.748614+00:00 [error] <0.19347.2> Error on AMQP connection <0.19347.2> (172.17.0.1:46318 -> 172.17.0.2:5672, vhost: '/', user: 'guest', state: running), channel 20: 2022-05-24 11:00:12.748614+00:00 [error] <0.19347.2> operation basic.publish caused a connection exception channel_error: "expected 'channel.open'" ``` * Extend number of iterations * Looks like this has a good chance of fixing the issue related to #85 * Indicate that a channel is closed immediately after decoding a channelClose frame * Close the channel prior to the Once call in the same manner as the Connection * No need to set closed again, wording * Convert to the correct error type, thanks to @Gsantomaggio * Conversion fixes --- channel.go | 20 ++++++++++------ connection.go | 3 +++ integration_test.go | 57 ++++++++++++++++++--------------------------- types.go | 12 ++++++++++ 4 files changed, 51 insertions(+), 41 deletions(-) diff --git a/channel.go b/channel.go index 5ad9546..111b109 100644 --- a/channel.go +++ b/channel.go @@ -87,9 +87,16 @@ func newChannel(c *Connection, id uint16) *Channel { } } +// Signal that from now on, Channel.send() should call Channel.sendClosed() +func (ch *Channel) setClosed() { + atomic.StoreInt32(&ch.closed, 1) +} + // shutdown is called by Connection after the channel has been removed from the // connection registry. func (ch *Channel) shutdown(e *Error) { + ch.setClosed() + ch.destructor.Do(func() { ch.m.Lock() defer ch.m.Unlock() @@ -105,10 +112,6 @@ func (ch *Channel) shutdown(e *Error) { } } - // Signal that from now on, Channel.send() should call - // Channel.sendClosed() - atomic.StoreInt32(&ch.closed, 1) - // Notify RPC if we're selecting if e != nil { ch.errors <- e @@ -154,7 +157,7 @@ func (ch *Channel) shutdown(e *Error) { // only 'channel.close' is sent to the server. func (ch *Channel) send(msg message) (err error) { // If the channel is closed, use Channel.sendClosed() - if atomic.LoadInt32(&ch.closed) == 1 { + if ch.IsClosed() { return ch.sendClosed(msg) } @@ -231,7 +234,7 @@ func (ch *Channel) sendOpen(msg message) (err error) { } // If the channel is closed, use Channel.sendClosed() - if atomic.LoadInt32(&ch.closed) == 1 { + if ch.IsClosed() { return ch.sendClosed(msg) } @@ -266,7 +269,7 @@ func (ch *Channel) sendOpen(msg message) (err error) { } } else { // If the channel is closed, use Channel.sendClosed() - if atomic.LoadInt32(&ch.closed) == 1 { + if ch.IsClosed() { return ch.sendClosed(msg) } @@ -284,6 +287,9 @@ func (ch *Channel) sendOpen(msg message) (err error) { func (ch *Channel) dispatch(msg message) { switch m := msg.(type) { case *channelClose: + // Note: channel state is set to closed immedately after the message is + // decoded by the Connection + // lock before sending connection.close-ok // to avoid unexpected interleaving with basic.publish frames if // publishing is happening concurrently diff --git a/connection.go b/connection.go index a36bde4..72a25a6 100644 --- a/connection.go +++ b/connection.go @@ -494,8 +494,11 @@ func (c *Connection) dispatch0(f frame) { func (c *Connection) dispatchN(f frame) { c.m.Lock() channel := c.channels[f.channel()] + updateChannel(f, channel) c.m.Unlock() + // Note: this could result in concurrent dispatch depending on + // how channels are managed in an application if channel != nil { channel.recv(channel, f) } else { diff --git a/integration_test.go b/integration_test.go index 9d884b3..2561c29 100644 --- a/integration_test.go +++ b/integration_test.go @@ -1539,46 +1539,36 @@ func TestDeadlockConsumerIssue48(t *testing.T) { // https://github.com/streadway/amqp/issues/46 func TestRepeatedChannelExceptionWithPublishAndMaxProcsIssue46(t *testing.T) { - var conn *Connection = nil + conn := integrationConnection(t, "issue46") + if conn == nil { + t.Fatal("conn is nil") + } t.Cleanup(func() { - if conn != nil { - conn.Close() - } + conn.Close() }) for i := 0; i < 100; i++ { - if conn == nil || conn.IsClosed() { - conn = integrationConnection(t, "issue46") - if conn == nil { - t.Fatal("conn is nil") - } - } - - ch, err := conn.Channel() - if err, ok := err.(Error); ok { - if err.Code != 504 { - t.Fatalf("expected channel only exception i: %d got: %+v", i, err) - } + if conn.IsClosed() { + t.Fatal("conn is closed") } - if ch == nil { - continue + ch, channelOpenError := conn.Channel() + if channelOpenError != nil { + t.Fatalf("error opening channel: %d error: %+v", i, channelOpenError) } - for j := 0; j < 10; j++ { + for j := 0; j < 100; j++ { if ch.IsClosed() { + if j == 0 { + t.Fatal("channel should not be closed") + } break - } else { - err = ch.Publish("not-existing-exchange", "some-key", false, false, Publishing{Body: []byte("some-data")}) - if err, ok := err.(Error); ok { - if err.Code != 504 { - t.Fatalf("expected channel only exception i: %d j: %d got: %+v", i, j, err) - } - if cerr := ch.Close(); cerr != nil { - t.Logf("error on channel close i: %d j: %d got: %+v", i, j, cerr) - } - break + } + err := ch.Publish("not-existing-exchange", "some-key", false, false, Publishing{Body: []byte("some-data")}) + if err != nil { + if publishError, ok := err.(*Error); !ok || publishError.Code != 504 { + t.Fatalf("expected channel only exception i: %d j: %d error: %+v", i, j, publishError) } } } @@ -1769,13 +1759,12 @@ func TestExchangeDeclarePrecondition(t *testing.T) { if err == nil { t.Fatalf("Expected to fail a redeclare with different durability, didn't receive an error") - } - - if err, ok := err.(Error); ok { - if err.Code != PreconditionFailed { + } else { + declareErr := err.(*Error) + if declareErr.Code != PreconditionFailed { t.Fatalf("Expected precondition error") } - if !err.Recover { + if !declareErr.Recover { t.Fatalf("Expected to be able to recover") } } diff --git a/types.go b/types.go index 994473b..ba2ee3d 100644 --- a/types.go +++ b/types.go @@ -308,6 +308,18 @@ type frame interface { channel() uint16 } +/* +Perform any updates on the channel immediately after the frame is decoded while the +connection mutex is held. +*/ +func updateChannel(f frame, channel *Channel) { + if mf, isMethodFrame := f.(*methodFrame); isMethodFrame { + if _, isChannelClose := mf.Method.(*channelClose); isChannelClose { + channel.setClosed() + } + } +} + type reader struct { r io.Reader }