Skip to content

Commit

Permalink
Merge pull request #297 from rabbitmq/amqp091-go-296
Browse files Browse the repository at this point in the history
Investigate GH-296
  • Loading branch information
michaelklishin authored Feb 7, 2025
2 parents b3d409f + 2774d33 commit 566601c
Showing 1 changed file with 108 additions and 24 deletions.
132 changes: 108 additions & 24 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2205,6 +2205,114 @@ func TestDeliveryAckShouldReturnSpecificErrorOnClosedChannel(t *testing.T) {
}
}

// https://github.com/rabbitmq/amqp091-go/issues/11
func TestShouldNotWaitAfterConnectionClosedNewChannelCreatedIssue11(t *testing.T) {
conn := integrationConnection(t, "TestShouldNotWaitAfterConnectionClosedNewChannelCreatedIssue11")
ch, err := conn.Channel()
if err != nil {
t.Fatalf("channel error: %v", err)
}

conn.NotifyClose(make(chan *Error, 1))

_, err = ch.PublishWithDeferredConfirmWithContext(context.TODO(), "issue11", "issue11", false, false, Publishing{Body: []byte("abc")})
if err != nil {
t.Fatalf("PublishWithDeferredConfirm error: %v", err)
}

ch.Close()
conn.Close()

_, err = conn.Channel()
if err == nil {
t.Fatalf("Opening a channel from a closed connection should not block but returning an error %v", err)
}
}

// https://github.com/rabbitmq/amqp091-go/issues/296
func TestAckShouldNotCloseChannel_GH296(t *testing.T) {
// setup
const messageCount = 10
queueName := t.Name()
c, ch := integrationQueue(t, queueName)
defer ch.Close()
defer c.Close()

notifyConnClosed := make(chan *Error)
c.NotifyClose(notifyConnClosed)

notifyChannelClosed := make(chan *Error)
ch.NotifyClose(notifyChannelClosed)

for i := 0; i < messageCount; i++ {
err := ch.Publish(DefaultExchange, queueName, false, false, Publishing{
Body: []byte("this is a test"),
})
if err != nil {
t.Fatalf("publish error: %v", err)
}
}

err := ch.Qos(3, 0, false)
if err != nil {
t.Fatalf("Qos error: %v", err)
}

msgs, err := ch.Consume(
queueName, // queue
t.Name(), // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
t.Fatalf("Consume error: %v", err)
}

signal := make(chan int)
acks := make(chan *Delivery, messageCount)

go func() {
counter := 0
for {
select {
case msg := <-msgs:
go worker(acks, &msg)
case ack := <-acks:
ackError := ack.Ack(false)
if ackError != nil {
t.Logf("Ack error: %v", ackError)
}
counter = counter + 1
if counter >= messageCount {
signal <- counter
return
}
case <-time.After(500 * time.Millisecond):
t.Log("timed out waiting to do something")
}
}
}()

select {
case connError := <-notifyConnClosed:
t.Logf("saw connection closure error: %v", connError)
case channelError := <-notifyChannelClosed:
t.Logf("saw channel closure error: %v", channelError)
case count := <-signal:
t.Logf("saw %d messages", count)
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting to see %d messages", messageCount)
}
}

func worker(acks chan<- *Delivery, msg *Delivery) {
time.Sleep(time.Millisecond * time.Duration(msg.DeliveryTag) * 100)
acks <- msg
}

/*
* Support for integration tests
*/
Expand Down Expand Up @@ -2279,30 +2387,6 @@ func assertConsumeBody(t *testing.T, messages <-chan Delivery, want []byte) (msg
return msg
}

// https://github.com/rabbitmq/amqp091-go/issues/11
func TestShouldNotWaitAfterConnectionClosedNewChannelCreatedIssue11(t *testing.T) {
conn := integrationConnection(t, "TestShouldNotWaitAfterConnectionClosedNewChannelCreatedIssue11")
ch, err := conn.Channel()
if err != nil {
t.Fatalf("channel error: %v", err)
}

conn.NotifyClose(make(chan *Error, 1))

_, err = ch.PublishWithDeferredConfirmWithContext(context.TODO(), "issue11", "issue11", false, false, Publishing{Body: []byte("abc")})
if err != nil {
t.Fatalf("PublishWithDeferredConfirm error: %v", err)
}

ch.Close()
conn.Close()

_, err = conn.Channel()
if err == nil {
t.Fatalf("Opening a channel from a closed connection should not block but returning an error %v", err)
}
}

// Pulls out the CRC and verifies the remaining content against the CRC
func assertMessageCrc32(t *testing.T, msg []byte, assert string) {
t.Helper()
Expand Down

0 comments on commit 566601c

Please sign in to comment.