Skip to content

Commit

Permalink
Demonstrate correct way to ack from another goroutine.
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Feb 5, 2025
1 parent 9072eb2 commit 1851ab4
Showing 1 changed file with 22 additions and 13 deletions.
35 changes: 22 additions & 13 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2272,39 +2272,48 @@ func TestMultiAckShouldNotCloseChannel_GH296(t *testing.T) {
}

signal := make(chan bool)
acks := make(chan *Delivery, messageCount)

go func() {
counter := 0
for msg := range msgs {
go worker(t, &msg)
counter = counter + 1
if counter >= messageCount {
signal <- true
for ;; {
select {
case msg := <-msgs:
t.Logf("starting worker for message: %d", msg.DeliveryTag)
go worker(t, acks, &msg)
case ack := <-acks:
ackError := ack.Ack(false)
if ackError != nil {
t.Logf("Ack error: %v", ackError)
}
counter = counter + 1
if counter >= messageCount {
signal <- true
return
}
case <-time.After(500 * time.Millisecond):
t.Log("timed out waiting to do something")
}
}
}()

select {
case connError := <-notifyConnClosed:
t.Logf("saw connection closure error: %v", connError)
break
case channelError := <-notifyChannelClosed:
t.Logf("saw channel closure error: %v", channelError)
break
case <-signal:
t.Logf("saw %d messages", messageCount)
break
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting to see %d messages", messageCount)
}
}

func worker(t *testing.T, msg *Delivery) {
func worker(t *testing.T, acks chan<- *Delivery, msg *Delivery) {
t.Logf("worker processing message: %d", msg.DeliveryTag)
err := msg.Ack(false)
if err != nil {
t.Logf("Ack error: %v", err)
}
time.Sleep(time.Millisecond * time.Duration(msg.DeliveryTag) * 100)
acks <- msg
t.Logf("worker done processing message: %d", msg.DeliveryTag)
}

/*
Expand Down

0 comments on commit 1851ab4

Please sign in to comment.