From 3f8415b9746c1053d7d7d8aa30735a860d6d6d1b Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Sat, 10 Feb 2024 00:37:25 +0000 Subject: [PATCH 1/3] Fixing a memory leak when doing cross receiver settlement. go-amqp holds onto some tracking data that won't get cleared if we don't try to settle through the original Receiver. Our fix in here, combined with go-amqp's changes to route to the original receiver, should seal that up. Benchmark added that also doubles as a stress test. --- sdk/messaging/azservicebus/amqp_message.go | 3 - sdk/messaging/azservicebus/client_test.go | 11 +- sdk/messaging/azservicebus/internal/mgmt.go | 4 +- .../internal/mock/emulation/events.go | 1 + .../azservicebus/internal/stress/Chart.lock | 6 +- .../azservicebus/internal/stress/Dockerfile | 3 + .../internal/stress/scenarios-matrix.yaml | 10 +- .../internal/stress/shared/streaming_batch.go | 17 +- .../internal/stress/shared/utils.go | 2 +- .../stress/templates/stress-test-job.yaml | 4 + .../benchmarks/backup_settlement_leak_test.go | 115 ++++++++++ .../stress/tests/benchmarks/main_test.go | 26 +++ .../stress/tests/benchmarks/readme.md | 9 + sdk/messaging/azservicebus/message.go | 13 +- sdk/messaging/azservicebus/messageSettler.go | 204 +++++++++++------- .../azservicebus/messageSettler_test.go | 120 ++++++++--- sdk/messaging/azservicebus/message_test.go | 37 +++- sdk/messaging/azservicebus/receiver.go | 14 +- .../azservicebus/receiver_simulated_test.go | 6 +- sdk/messaging/azservicebus/receiver_test.go | 2 +- sdk/messaging/azservicebus/sender_test.go | 5 +- 21 files changed, 452 insertions(+), 160 deletions(-) create mode 100644 sdk/messaging/azservicebus/internal/stress/tests/benchmarks/backup_settlement_leak_test.go create mode 100644 sdk/messaging/azservicebus/internal/stress/tests/benchmarks/main_test.go create mode 100644 sdk/messaging/azservicebus/internal/stress/tests/benchmarks/readme.md diff --git a/sdk/messaging/azservicebus/amqp_message.go b/sdk/messaging/azservicebus/amqp_message.go index b96eb98d19c4..1fbc53c0d68c 100644 --- a/sdk/messaging/azservicebus/amqp_message.go +++ b/sdk/messaging/azservicebus/amqp_message.go @@ -57,8 +57,6 @@ type AMQPAnnotatedMessage struct { // Properties corresponds to the properties section of an AMQP message. Properties *AMQPAnnotatedMessageProperties - linkName string - // inner is the AMQP message we originally received, which contains some hidden // data that's needed to settle with go-amqp. We strip out most of the underlying // data so it's fairly minimal. @@ -273,7 +271,6 @@ func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message, receivingLinkName stri DeliveryTag: goAMQPMessage.DeliveryTag, Footer: footer, Header: header, - linkName: receivingLinkName, Properties: properties, inner: goAMQPMessage, } diff --git a/sdk/messaging/azservicebus/client_test.go b/sdk/messaging/azservicebus/client_test.go index e4eef534558a..7574e8ffa6f2 100644 --- a/sdk/messaging/azservicebus/client_test.go +++ b/sdk/messaging/azservicebus/client_test.go @@ -76,13 +76,12 @@ func TestNewClientWithAzureIdentity(t *testing.T) { receiver, err := client.NewReceiverForQueue(queue, nil) require.NoError(t, err) - actualSettler, _ := receiver.settler.(*messageSettler) - actualSettler.onlyDoBackupSettlement = true // this'll also exercise the management link messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil) require.NoError(t, err) require.EqualValues(t, []string{"hello - authenticating with a TokenCredential"}, getSortedBodies(messages)) + forceManagementSettlement(t, messages) for _, m := range messages { err = receiver.CompleteMessage(context.TODO(), m, nil) @@ -550,7 +549,7 @@ func TestNewClientUnitTests(t *testing.T) { MaxRetryDelay: 12 * time.Hour, }, receiver.retryOptions) - actualSettler := receiver.settler.(*messageSettler) + actualSettler := receiver.settler require.Equal(t, RetryOptions{ MaxRetries: 101, @@ -580,3 +579,9 @@ func assertRPCNotFound(t *testing.T, err error) { require.ErrorAs(t, err, &rpcError) require.Equal(t, http.StatusNotFound, rpcError.RPCCode()) } + +func forceManagementSettlement(t *testing.T, messages []*ReceivedMessage) { + for _, m := range messages { + m.settleOnMgmtLink = true + } +} diff --git a/sdk/messaging/azservicebus/internal/mgmt.go b/sdk/messaging/azservicebus/internal/mgmt.go index cf57ba06e004..fe27c433f651 100644 --- a/sdk/messaging/azservicebus/internal/mgmt.go +++ b/sdk/messaging/azservicebus/internal/mgmt.go @@ -374,10 +374,10 @@ func SetSessionState(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName str return nil } -// SendDisposition allows you settle a message using the management link, rather than via your +// SettleOnMgmtLink allows you settle a message using the management link, rather than via your // *amqp.Receiver. Use this if the receiver has been closed/lost or if the message isn't associated // with a link (ex: deferred messages). -func SendDisposition(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]any) error { +func SettleOnMgmtLink(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]any) error { if lockToken == nil { err := errors.New("lock token on the message is not set, thus cannot send disposition") return err diff --git a/sdk/messaging/azservicebus/internal/mock/emulation/events.go b/sdk/messaging/azservicebus/internal/mock/emulation/events.go index d34791841e05..4f73ff503024 100644 --- a/sdk/messaging/azservicebus/internal/mock/emulation/events.go +++ b/sdk/messaging/azservicebus/internal/mock/emulation/events.go @@ -68,6 +68,7 @@ const ( DispositionTypeAccept DispositionType = "accept" DispositionTypeReject DispositionType = "reject" DispositionTypeRelease DispositionType = "release" + DispositionTypeModify DispositionType = "modify" // used for abandoning a message ) type DispositionEvent struct { diff --git a/sdk/messaging/azservicebus/internal/stress/Chart.lock b/sdk/messaging/azservicebus/internal/stress/Chart.lock index 47b24906d1ce..201277cd95ed 100644 --- a/sdk/messaging/azservicebus/internal/stress/Chart.lock +++ b/sdk/messaging/azservicebus/internal/stress/Chart.lock @@ -1,6 +1,6 @@ dependencies: - name: stress-test-addons repository: https://stresstestcharts.blob.core.windows.net/helm/ - version: 0.3.0 -digest: sha256:3e21a7fdf5d6b37e871a6dd9f755888166fbb24802aa517f51d1d9223b47656e -generated: "2023-09-26T11:43:56.706771668-07:00" + version: 0.3.1 +digest: sha256:28e374f8db5c46447b2a1491d4361ceb126536c425cbe54be49017120fe7b27d +generated: "2024-02-05T17:21:31.510400504-08:00" diff --git a/sdk/messaging/azservicebus/internal/stress/Dockerfile b/sdk/messaging/azservicebus/internal/stress/Dockerfile index 8a1b2a5f3065..6d0ac4654aaf 100644 --- a/sdk/messaging/azservicebus/internal/stress/Dockerfile +++ b/sdk/messaging/azservicebus/internal/stress/Dockerfile @@ -6,6 +6,8 @@ ENV CGO_ENABLED=0 ADD . /src WORKDIR /src/internal/stress RUN go build -o stress . +WORKDIR /src/internal/stress/tests/benchmarks +RUN go test -c # The first container is just for building the artifacts, and contains all the source, etc... # That container instance only ever lives on your local machine (or the build machine). @@ -15,5 +17,6 @@ RUN go build -o stress . FROM mcr.microsoft.com/cbl-mariner/base/core:2.0 WORKDIR /app COPY --from=build /src/internal/stress/stress /app/stress +COPY --from=build /src/internal/stress/tests/benchmarks/benchmarks.test /app/benchmarks.test RUN yum update -y && yum install -y ca-certificates ENTRYPOINT ["/bin/bash"] diff --git a/sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml b/sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml index f53cefe43733..5bfc5f893d50 100644 --- a/sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml +++ b/sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml @@ -1,5 +1,5 @@ displayNames: - # this makes it so these don't show up in the scenario names, + # this makes it so these don't show up in the scenario names, # since they're just clutter. 1.5Gi": "" 4Gi": "" @@ -23,7 +23,7 @@ matrix: testTarget: finitePeeks memory: "0.5Gi" finiteSendAndReceive: - testTarget: finiteSendAndReceive + testTarget: finiteSendAndReceive memory: "0.5Gi" finiteSessions: testTarget: finiteSessions @@ -52,10 +52,14 @@ matrix: memory: "0.5Gi" rapidOpenClose: testTarget: rapidOpenClose - memory: "0.5Gi" + memory: "0.5Gi" receiveCancellation: testTarget: receiveCancellation memory: "0.5Gi" sendAndReceiveDrain: testTarget: sendAndReceiveDrain memory: "0.5Gi" + benchmarkBackupSettlementLeak: + benchmark: true + testTarget: "BenchmarkBackupSettlementLeakWhileOldReceiverStillAlive" + memory: "1.0Gi" diff --git a/sdk/messaging/azservicebus/internal/stress/shared/streaming_batch.go b/sdk/messaging/azservicebus/internal/stress/shared/streaming_batch.go index b622bd4078f9..b57c998f602c 100644 --- a/sdk/messaging/azservicebus/internal/stress/shared/streaming_batch.go +++ b/sdk/messaging/azservicebus/internal/stress/shared/streaming_batch.go @@ -34,7 +34,7 @@ func (sw *senderWrapper) NewMessageBatch(ctx context.Context, options *azservice return sw.inner.NewMessageBatch(ctx, options) } -func NewStreamingMessageBatch(ctx context.Context, sender internalBatchSender) (*StreamingMessageBatch, error) { +func NewStreamingMessageBatch(ctx context.Context, sender internalBatchSender, expectedTotal int) (*StreamingMessageBatch, error) { batch, err := sender.NewMessageBatch(ctx, nil) if err != nil { @@ -42,14 +42,17 @@ func NewStreamingMessageBatch(ctx context.Context, sender internalBatchSender) ( } return &StreamingMessageBatch{ - sender: sender, - currentBatch: batch, + sender: sender, + currentBatch: batch, + expectedTotal: expectedTotal, }, nil } type StreamingMessageBatch struct { - sender internalBatchSender - currentBatch internalBatch + sender internalBatchSender + currentBatch internalBatch + expectedTotal int + total int } // Add appends to the current batch. If it's full it'll send it, allocate a new one. @@ -65,11 +68,13 @@ func (sb *StreamingMessageBatch) Add(ctx context.Context, msg *azservicebus.Mess return err } - log.Printf("Sending message batch (%d messages)", sb.currentBatch.NumMessages()) + log.Printf("Sending message batch with %d messages. %d/%d messages sent so far.", sb.currentBatch.NumMessages(), sb.total, sb.expectedTotal) if err := sb.sender.SendMessageBatch(ctx, sb.currentBatch); err != nil { return err } + sb.total += int(sb.currentBatch.NumMessages()) + // throttle a teeny bit. time.Sleep(time.Second) diff --git a/sdk/messaging/azservicebus/internal/stress/shared/utils.go b/sdk/messaging/azservicebus/internal/stress/shared/utils.go index 3f2d5d7a9bd8..577e6349cc83 100644 --- a/sdk/messaging/azservicebus/internal/stress/shared/utils.go +++ b/sdk/messaging/azservicebus/internal/stress/shared/utils.go @@ -30,7 +30,7 @@ func MustGenerateMessages(sc *StressContext, sender *TrackingSender, messageLimi log.Printf("Sending %d messages", messageLimit) - streamingBatch, err := NewStreamingMessageBatch(ctx, &senderWrapper{inner: sender}) + streamingBatch, err := NewStreamingMessageBatch(ctx, &senderWrapper{inner: sender}, messageLimit) sc.PanicOnError("failed to create streaming batch", err) extraBytes := make([]byte, numExtraBytes) diff --git a/sdk/messaging/azservicebus/internal/stress/templates/stress-test-job.yaml b/sdk/messaging/azservicebus/internal/stress/templates/stress-test-job.yaml index 5fb46c12b168..978e70871b3c 100644 --- a/sdk/messaging/azservicebus/internal/stress/templates/stress-test-job.yaml +++ b/sdk/messaging/azservicebus/internal/stress/templates/stress-test-job.yaml @@ -17,7 +17,11 @@ spec: - > set -ex; mkdir -p "$DEBUG_SHARE"; + {{ if ne .Stress.benchmark true }} /app/stress tests "{{ .Stress.testTarget }}" 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log"; + {{ else }} + /app/benchmarks.test -test.timeout 24h -test.bench {{ .Stress.testTarget }} 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log"; + {{ end }} # Pulls the image on pod start, always. We tend to push to the same image and tag over and over again # when iterating, so this is a must. imagePullPolicy: Always diff --git a/sdk/messaging/azservicebus/internal/stress/tests/benchmarks/backup_settlement_leak_test.go b/sdk/messaging/azservicebus/internal/stress/tests/benchmarks/backup_settlement_leak_test.go new file mode 100644 index 000000000000..3db686ea0b25 --- /dev/null +++ b/sdk/messaging/azservicebus/internal/stress/tests/benchmarks/backup_settlement_leak_test.go @@ -0,0 +1,115 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package benchmarks + +import ( + "context" + "fmt" + "log" + "math" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/stress/shared" +) + +// BackupSettlementLeak checks that, when we use backup settlement, that we're not +// leaking memory. This came up in a couple of issues for a customer: +// - https://github.com/Azure/azure-sdk-for-go/issues/22318 +// - https://github.com/Azure/azure-sdk-for-go/issues/22157 +// +// The use case for backup settlement is for when the original link we've received +// on has gone offline, so we need to settle via the management$ link instead. However, +// the underlying go-amqp link is tracking several bits of state for the message which +// will never get cleared. Since that receiver was dead it was going to get garbage +// collected anyways, so this was non-issue. +// +// This customer's use case was slightly different - they were completing on a separate +// receiver even when the original receiving link was still alive. This means the memory +// leak is just accumulating and never gets garbage collected since there's no trigger +// to know when to clear out any tracking state for the message. +func BenchmarkBackupSettlementLeakWhileOldReceiverStillAlive(b *testing.B) { + b.StopTimer() + + sc := shared.MustCreateStressContext("BenchmarkBackupSettlementLeak", nil) + defer sc.End() + + sent := int64(100000) + + client, queueName := mustInitBenchmarkBackupSettlementLeak(sc, b, int(sent)) + + oldReceiver, err := client.NewReceiverForQueue(queueName, nil) + sc.NoError(err) + + newReceiver, err := client.NewReceiverForQueue(queueName, nil) + sc.NoError(err) + + b.StartTimer() + + var completed int64 + expected := maxDeliveryCount * int64(sent) + + for completed < expected { + // receive from the old receiver and... + receiveCtx, cancel := context.WithTimeout(context.Background(), time.Minute) + + messages, err := oldReceiver.ReceiveMessages(receiveCtx, int(math.Min(float64(expected-completed), 5000)), &azservicebus.ReceiveMessagesOptions{ + // not super scientific - mostly just want to get slightly fuller batches + TimeAfterFirstMessage: 30 * time.Second, + }) + cancel() + sc.NoError(err) + + wg := sync.WaitGroup{} + wg.Add(len(messages)) + + // ...completing on another receiver + for _, m := range messages { + m := m + + go func() { + defer wg.Done() + + // abandon it so we see the message a few times (until it's deadlettered after 10 tries) + err := newReceiver.AbandonMessage(context.Background(), m, nil) + sc.NoError(err) + atomic.AddInt64(&completed, 1) + }() + } + + wg.Wait() + + b.Logf("Settled %d/%d", completed, sent) + } + + log.Printf("Forcing garbage collection\n") + runtime.GC() + log.Printf("Done with collection\n") + time.Sleep(1 * time.Minute) +} + +func mustInitBenchmarkBackupSettlementLeak(sc *shared.StressContext, b *testing.B, numToSend int) (*azservicebus.Client, string) { + queueName := fmt.Sprintf("backup-settlement-tester-%s", sc.Nano) + shared.MustCreateAutoDeletingQueue(sc, queueName, &admin.QueueProperties{ + MaxDeliveryCount: to.Ptr[int32](maxDeliveryCount), + }) + + client, err := azservicebus.NewClientFromConnectionString(sc.ConnectionString, nil) + sc.PanicOnError("failed to create client", err) + + sender, err := shared.NewTrackingSender(sc.TC, client, queueName, nil) + sc.PanicOnError("create a sender", err) + + shared.MustGenerateMessages(sc, sender, numToSend, 0) + + return client, queueName +} + +const maxDeliveryCount = 20 diff --git a/sdk/messaging/azservicebus/internal/stress/tests/benchmarks/main_test.go b/sdk/messaging/azservicebus/internal/stress/tests/benchmarks/main_test.go new file mode 100644 index 000000000000..8dd88b60f80c --- /dev/null +++ b/sdk/messaging/azservicebus/internal/stress/tests/benchmarks/main_test.go @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package benchmarks + +import ( + "log" + "os" + "testing" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/stress/shared" +) + +func TestMain(m *testing.M) { + if os.Getenv("ENV_FILE") == "" { + os.Setenv("ENV_FILE", "../../../../.env") + } + + err := shared.LoadEnvironment() + + if err != nil { + log.Fatal(err) + } + + os.Exit(m.Run()) +} diff --git a/sdk/messaging/azservicebus/internal/stress/tests/benchmarks/readme.md b/sdk/messaging/azservicebus/internal/stress/tests/benchmarks/readme.md new file mode 100644 index 000000000000..509f68b46914 --- /dev/null +++ b/sdk/messaging/azservicebus/internal/stress/tests/benchmarks/readme.md @@ -0,0 +1,9 @@ +## Generating + +`go test -memprofile mem.out -bench .` + +## Visualizing + +Run: +* `sudo apt install graphviz` +* `go tool pprof -http localhost:8000 -base mem.out.before_fix mem.out.after_fix` diff --git a/sdk/messaging/azservicebus/message.go b/sdk/messaging/azservicebus/message.go index 90c22a446545..5fbe586d864c 100644 --- a/sdk/messaging/azservicebus/message.go +++ b/sdk/messaging/azservicebus/message.go @@ -9,6 +9,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/internal/log" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap" "github.com/Azure/go-amqp" ) @@ -126,10 +127,9 @@ type ReceivedMessage struct { // and Header fields. RawAMQPMessage *AMQPAnnotatedMessage - // deferred indicates we received it using ReceiveDeferredMessages. These messages - // will still go through the normal Receiver.Settle functions but internally will - // always be settled with the management link. - deferred bool + linkName string // used when we call into the management link. It counts towards a link not being considered idle. + + settleOnMgmtLink bool // used for cases like when a message is received that was deferred. It can only be settled on the management link. } // Message creates a shallow copy of the fields from this message to an instance of @@ -310,10 +310,11 @@ func (m *Message) toAMQPMessage() *amqp.Message { // newReceivedMessage creates a received message from an AMQP message. // NOTE: this converter assumes that the Body of this message will be the first // serialized byte array in the Data section of the messsage. -func newReceivedMessage(amqpMsg *amqp.Message, receivingLinkName string) *ReceivedMessage { +func newReceivedMessage(amqpMsg *amqp.Message, receiver amqpwrap.AMQPReceiver) *ReceivedMessage { msg := &ReceivedMessage{ - RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg, receivingLinkName), + RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg, receiver.LinkName()), State: MessageStateActive, + linkName: receiver.LinkName(), } if len(msg.RawAMQPMessage.Body.Data) == 1 { diff --git a/sdk/messaging/azservicebus/messageSettler.go b/sdk/messaging/azservicebus/messageSettler.go index 915cc020d905..e246b823c504 100644 --- a/sdk/messaging/azservicebus/messageSettler.go +++ b/sdk/messaging/azservicebus/messageSettler.go @@ -5,6 +5,7 @@ package azservicebus import ( "context" + "errors" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap" @@ -12,34 +13,24 @@ import ( "github.com/Azure/go-amqp" ) -type settler interface { - CompleteMessage(ctx context.Context, message *ReceivedMessage, options *CompleteMessageOptions) error - AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error - DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error - DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error -} - type messageSettler struct { links internal.AMQPLinks retryOptions RetryOptions - // used only for tests - onlyDoBackupSettlement bool + // these are used for testing so we can tell which paths we tried to settle. + notifySettleOnLink func(message *ReceivedMessage) + notifySettleOnManagement func(message *ReceivedMessage) } -func newMessageSettler(links internal.AMQPLinks, retryOptions RetryOptions) settler { +func newMessageSettler(links internal.AMQPLinks, retryOptions RetryOptions) *messageSettler { return &messageSettler{ - links: links, - retryOptions: retryOptions, + links: links, + retryOptions: retryOptions, + notifySettleOnLink: func(message *ReceivedMessage) {}, + notifySettleOnManagement: func(message *ReceivedMessage) {}, } } -func (s *messageSettler) useManagementLink(m *ReceivedMessage, receiver amqpwrap.AMQPReceiver) bool { - return s.onlyDoBackupSettlement || - m.deferred || - m.RawAMQPMessage.linkName != receiver.LinkName() -} - func (s *messageSettler) settleWithRetries(ctx context.Context, message *ReceivedMessage, settleFn func(receiver amqpwrap.AMQPReceiver, rpcLink amqpwrap.RPCLink) error) error { if s == nil { return internal.NewErrNonRetriable("messages that are received in `ReceiveModeReceiveAndDelete` mode are not settleable") @@ -62,13 +53,22 @@ type CompleteMessageOptions struct { } // CompleteMessage completes a message, deleting it from the queue or subscription. -func (s *messageSettler) CompleteMessage(ctx context.Context, message *ReceivedMessage, options *CompleteMessageOptions) error { - return s.settleWithRetries(ctx, message, func(receiver amqpwrap.AMQPReceiver, rpcLink amqpwrap.RPCLink) error { - if s.useManagementLink(message, receiver) { - return internal.SendDisposition(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), internal.Disposition{Status: internal.CompletedDisposition}, nil) - } else { - return receiver.AcceptMessage(ctx, message.RawAMQPMessage.inner) +func (ms *messageSettler) CompleteMessage(ctx context.Context, message *ReceivedMessage, options *CompleteMessageOptions) error { + return ms.settleWithRetries(ctx, message, func(receiver amqpwrap.AMQPReceiver, rpcLink amqpwrap.RPCLink) error { + var err error + + if shouldSettleOnReceiver(message) { + ms.notifySettleOnLink(message) + err = receiver.AcceptMessage(ctx, message.RawAMQPMessage.inner) } + + if shouldSettleOnMgmtLink(err, message) { + ms.notifySettleOnManagement(message) + return internal.SettleOnMgmtLink(ctx, rpcLink, receiver.LinkName(), + bytesToAMQPUUID(message.LockToken), internal.Disposition{Status: internal.CompletedDisposition}, nil) + } + + return err }) } @@ -81,9 +81,29 @@ type AbandonMessageOptions struct { // AbandonMessage will cause a message to be returned to the queue or subscription. // This will increment its delivery count, and potentially cause it to be dead lettered // depending on your queue or subscription's configuration. -func (s *messageSettler) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error { - return s.settleWithRetries(ctx, message, func(receiver amqpwrap.AMQPReceiver, rpcLink amqpwrap.RPCLink) error { - if s.useManagementLink(message, receiver) { +func (ms *messageSettler) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error { + return ms.settleWithRetries(ctx, message, func(receiver amqpwrap.AMQPReceiver, rpcLink amqpwrap.RPCLink) error { + var err error + + if shouldSettleOnReceiver(message) { + ms.notifySettleOnLink(message) + + var annotations amqp.Annotations + + if options != nil { + annotations = newAnnotations(options.PropertiesToModify) + } + + err = receiver.ModifyMessage(ctx, message.RawAMQPMessage.inner, &amqp.ModifyMessageOptions{ + DeliveryFailed: false, + UndeliverableHere: false, + Annotations: annotations, + }) + } + + if shouldSettleOnMgmtLink(err, message) { + ms.notifySettleOnManagement(message) + d := internal.Disposition{ Status: internal.AbandonedDisposition, } @@ -94,20 +114,10 @@ func (s *messageSettler) AbandonMessage(ctx context.Context, message *ReceivedMe propertiesToModify = options.PropertiesToModify } - return internal.SendDisposition(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), d, propertiesToModify) - } - - var annotations amqp.Annotations - - if options != nil { - annotations = newAnnotations(options.PropertiesToModify) + return internal.SettleOnMgmtLink(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), d, propertiesToModify) } - return receiver.ModifyMessage(ctx, message.RawAMQPMessage.inner, &amqp.ModifyMessageOptions{ - DeliveryFailed: false, - UndeliverableHere: false, - Annotations: annotations, - }) + return err }) } @@ -119,9 +129,30 @@ type DeferMessageOptions struct { // DeferMessage will cause a message to be deferred. Deferred messages // can be received using `Receiver.ReceiveDeferredMessages`. -func (s *messageSettler) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error { - return s.settleWithRetries(ctx, message, func(receiver amqpwrap.AMQPReceiver, rpcLink amqpwrap.RPCLink) error { - if s.useManagementLink(message, receiver) { +func (ms *messageSettler) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error { + return ms.settleWithRetries(ctx, message, func(receiver amqpwrap.AMQPReceiver, rpcLink amqpwrap.RPCLink) error { + var err error + + if shouldSettleOnReceiver(message) { + ms.notifySettleOnLink(message) + + var annotations amqp.Annotations + + if options != nil { + annotations = newAnnotations(options.PropertiesToModify) + } + + err = receiver.ModifyMessage(ctx, message.RawAMQPMessage.inner, + &amqp.ModifyMessageOptions{ + DeliveryFailed: false, + UndeliverableHere: true, + Annotations: annotations, + }) + } + + if shouldSettleOnMgmtLink(err, message) { + ms.notifySettleOnManagement(message) + d := internal.Disposition{ Status: internal.DeferredDisposition, } @@ -132,21 +163,10 @@ func (s *messageSettler) DeferMessage(ctx context.Context, message *ReceivedMess propertiesToModify = options.PropertiesToModify } - return internal.SendDisposition(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), d, propertiesToModify) + return internal.SettleOnMgmtLink(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), d, propertiesToModify) } - var annotations amqp.Annotations - - if options != nil { - annotations = newAnnotations(options.PropertiesToModify) - } - - return receiver.ModifyMessage(ctx, message.RawAMQPMessage.inner, - &amqp.ModifyMessageOptions{ - DeliveryFailed: false, - UndeliverableHere: true, - Annotations: annotations, - }) + return err }) } @@ -166,8 +186,8 @@ type DeadLetterOptions struct { // DeadLetterMessage settles a message by moving it to the dead letter queue for a // queue or subscription. To receive these messages create a receiver with `Client.NewReceiver()` // using the `SubQueue` option. -func (s *messageSettler) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error { - return s.settleWithRetries(ctx, message, func(receiver amqpwrap.AMQPReceiver, rpcLink amqpwrap.RPCLink) error { +func (ms *messageSettler) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error { + return ms.settleWithRetries(ctx, message, func(receiver amqpwrap.AMQPReceiver, rpcLink amqpwrap.RPCLink) error { reason := "" description := "" @@ -181,7 +201,33 @@ func (s *messageSettler) DeadLetterMessage(ctx context.Context, message *Receive } } - if s.useManagementLink(message, receiver) { + var err error + + if shouldSettleOnReceiver(message) { + ms.notifySettleOnLink(message) + + info := map[string]any{ + "DeadLetterReason": reason, + "DeadLetterErrorDescription": description, + } + + if options != nil && options.PropertiesToModify != nil { + for key, val := range options.PropertiesToModify { + info[key] = val + } + } + + amqpErr := amqp.Error{ + Condition: "com.microsoft:dead-letter", + Info: info, + } + + err = receiver.RejectMessage(ctx, message.RawAMQPMessage.inner, &amqpErr) + } + + if shouldSettleOnMgmtLink(err, message) { + ms.notifySettleOnManagement(message) + d := internal.Disposition{ Status: internal.SuspendedDisposition, DeadLetterDescription: &description, @@ -194,26 +240,10 @@ func (s *messageSettler) DeadLetterMessage(ctx context.Context, message *Receive propertiesToModify = options.PropertiesToModify } - return internal.SendDisposition(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), d, propertiesToModify) - } - - info := map[string]any{ - "DeadLetterReason": reason, - "DeadLetterErrorDescription": description, - } - - if options != nil && options.PropertiesToModify != nil { - for key, val := range options.PropertiesToModify { - info[key] = val - } - } - - amqpErr := amqp.Error{ - Condition: "com.microsoft:dead-letter", - Info: info, + return internal.SettleOnMgmtLink(ctx, rpcLink, receiver.LinkName(), bytesToAMQPUUID(message.LockToken), d, propertiesToModify) } - return receiver.RejectMessage(ctx, message.RawAMQPMessage.inner, &amqpErr) + return err }) } @@ -235,3 +265,25 @@ func newAnnotations(propertiesToModify map[string]any) amqp.Annotations { return annotations } + +func shouldSettleOnReceiver(message *ReceivedMessage) bool { + // deferred messages always go through the management link + return !message.settleOnMgmtLink +} + +func shouldSettleOnMgmtLink(settlementErr error, message *ReceivedMessage) bool { + if message.settleOnMgmtLink { + // deferred messages always go through the management link + return true + } + + if settlementErr == nil { + // we settled on the original receiver + return false + } + + // if we got a connection or link error we can try settling against the mgmt link since + // our original receiver is gone. + var linkErr *amqp.LinkError + return errors.As(settlementErr, &linkErr) +} diff --git a/sdk/messaging/azservicebus/messageSettler_test.go b/sdk/messaging/azservicebus/messageSettler_test.go index 843a2151abed..1af8a7b985ec 100644 --- a/sdk/messaging/azservicebus/messageSettler_test.go +++ b/sdk/messaging/azservicebus/messageSettler_test.go @@ -9,6 +9,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/test" "github.com/stretchr/testify/require" ) @@ -203,52 +204,103 @@ func TestDeferredMessage_DeadLettering(t *testing.T) { } func TestMessageSettlementUsingOnlyBackupSettlement(t *testing.T) { - testStuff := newTestStuff(t) - defer testStuff.Close() + newConn, cleanup, queueName := setupLiveTest(t, nil) + t.Cleanup(cleanup) - actualSettler, _ := testStuff.Receiver.settler.(*messageSettler) - actualSettler.onlyDoBackupSettlement = true + sender, err := newConn.NewSender(queueName, nil) + require.NoError(t, err) - actualSettler, _ = testStuff.DeadLetterReceiver.settler.(*messageSettler) - actualSettler.onlyDoBackupSettlement = true + runTest := func(t *testing.T, whichToClose string, settlementFn func(*Receiver, *ReceivedMessage) error) { + err = sender.SendMessage(context.Background(), &Message{ + Body: []byte("hello"), + }, nil) + require.NoError(t, err) - receiver, deadLetterReceiver := testStuff.Receiver, testStuff.DeadLetterReceiver - ctx := context.TODO() + oldConn, err := NewClientFromConnectionString(test.GetConnectionString(t), nil) + require.NoError(t, err) + defer test.RequireClose(t, oldConn) - err := testStuff.Sender.SendMessage(context.Background(), &Message{ - Body: []byte("hello"), - }, nil) - require.NoError(t, err) + oldReceiver, err := oldConn.NewReceiverForQueue(queueName, nil) + require.NoError(t, err) + defer test.RequireClose(t, oldReceiver) - // toggle the super secret switch - actualSettler, _ = receiver.settler.(*messageSettler) - actualSettler.onlyDoBackupSettlement = true + messages, err := oldReceiver.ReceiveMessages(context.Background(), 1, nil) + require.NoError(t, err) - messages, err := receiver.ReceiveMessages(ctx, 1, nil) - require.NoError(t, err) - require.EqualValues(t, 1, messages[0].DeliveryCount) + switch whichToClose { + case "connection": + test.RequireClose(t, oldConn) + case "receiver": + test.RequireClose(t, oldReceiver) + case "": + // don't close anything. + default: + panic("Invalid `whichToClose` value " + whichToClose) + } + + newReceiver, err := newConn.NewReceiverForQueue(queueName, nil) + require.NoError(t, err) + defer test.RequireClose(t, newReceiver) - err = receiver.AbandonMessage(context.Background(), messages[0], nil) - require.NoError(t, err) + onLink := false + onMgmt := false - messages, err = receiver.ReceiveMessages(ctx, 1, nil) - require.NoError(t, err) - require.EqualValues(t, 2, messages[0].DeliveryCount) + newReceiver.settler.notifySettleOnLink = func(message *ReceivedMessage) { onLink = true } + newReceiver.settler.notifySettleOnManagement = func(message *ReceivedMessage) { onMgmt = true } - err = receiver.DeadLetterMessage(ctx, messages[0], nil) - require.NoError(t, err) + // old receiver is still open, so the settlement will occur there. + err = settlementFn(newReceiver, messages[0]) + require.NoError(t, err) - messages, err = deadLetterReceiver.ReceiveMessages(ctx, 1, nil) - require.NoError(t, err) - require.EqualValues(t, 2, messages[0].DeliveryCount) + switch whichToClose { + case "connection": + // we try to settle on the original link (and the entire connection is dead) so we fallback to the management link + require.True(t, onLink) + require.True(t, onMgmt) + case "receiver": + // we try to settle on the original link (which is dead) so we fallback to the management link + require.True(t, onLink) + require.True(t, onMgmt) + case "": + // original link was still alive so we can settle against it. No backup settlement required. + require.True(t, onLink) + require.False(t, onMgmt) + default: + panic("Invalid `whichToClose` value " + whichToClose) + } + } - err = deadLetterReceiver.CompleteMessage(context.Background(), messages[0], nil) - require.NoError(t, err) -} + tests := []struct { + Name string + F func(*Receiver, *ReceivedMessage) error + }{ + {"Abandon", func(r *Receiver, rm *ReceivedMessage) error { + return r.AbandonMessage(context.Background(), rm, nil) + }}, + {"Complete", func(r *Receiver, rm *ReceivedMessage) error { + return r.CompleteMessage(context.Background(), rm, nil) + }}, + {"DeadLetter", func(r *Receiver, rm *ReceivedMessage) error { + return r.DeadLetterMessage(context.Background(), rm, nil) + }}, + {"Defer", func(r *Receiver, rm *ReceivedMessage) error { + return r.DeferMessage(context.Background(), rm, nil) + }}, + } -func TestMessageSettlementWithDeferral(t *testing.T) { - testStuff := newTestStuff(t) - defer testStuff.Close() + for _, test := range tests { + t.Run(test.Name+"_OriginalReceiverAlive", func(t *testing.T) { + runTest(t, "", test.F) + }) + + t.Run(test.Name+"_OriginalReceiverDead", func(t *testing.T) { + runTest(t, "receiver", test.F) + }) + + t.Run(test.Name+"_OriginalConnDead", func(t *testing.T) { + runTest(t, "connection", test.F) + }) + } } type testStuff struct { diff --git a/sdk/messaging/azservicebus/message_test.go b/sdk/messaging/azservicebus/message_test.go index 5140a39a712b..30105f202abe 100644 --- a/sdk/messaging/azservicebus/message_test.go +++ b/sdk/messaging/azservicebus/message_test.go @@ -9,7 +9,9 @@ import ( "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/mock" "github.com/Azure/go-amqp" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" ) @@ -48,9 +50,14 @@ func TestMessageUnitTest(t *testing.T) { } func TestAMQPMessageToReceivedMessage(t *testing.T) { + ctrl := gomock.NewController(t) + receiver := mock.NewMockAMQPReceiver(ctrl) + receiver.EXPECT().LinkName().Return("receiving_link").AnyTimes() + t.Run("empty_message", func(t *testing.T) { // nothing should blow up. - rm := newReceivedMessage(&amqp.Message{}, "receiving_link") + + rm := newReceivedMessage(&amqp.Message{}, receiver) require.NotNil(t, rm) }) @@ -73,7 +80,7 @@ func TestAMQPMessageToReceivedMessage(t *testing.T) { }, } - receivedMessage := newReceivedMessage(amqpMessage, "receiving_link") + receivedMessage := newReceivedMessage(amqpMessage, receiver) require.Equal(t, []byte("hello"), receivedMessage.Body) require.EqualValues(t, lockedUntil, *receivedMessage.LockedUntil) @@ -134,7 +141,11 @@ func TestAMQPMessageToMessage(t *testing.T) { Data: [][]byte{[]byte("foo")}, } - msg := newReceivedMessage(amqpMsg, "receiving_link") + ctrl := gomock.NewController(t) + receiver := mock.NewMockAMQPReceiver(ctrl) + receiver.EXPECT().LinkName().Return("receiving_link").AnyTimes() + + msg := newReceivedMessage(amqpMsg, receiver) require.EqualValues(t, msg.MessageID, amqpMsg.Properties.MessageID, "messageID") require.EqualValues(t, msg.SessionID, amqpMsg.Properties.GroupID, "groupID") @@ -159,6 +170,10 @@ func TestAMQPMessageToMessage(t *testing.T) { } func TestMessageState(t *testing.T) { + ctrl := gomock.NewController(t) + receiver := mock.NewMockAMQPReceiver(ctrl) + receiver.EXPECT().LinkName().Return("receiving_link").AnyTimes() + testData := []struct { PropValue any Expected MessageState @@ -179,7 +194,7 @@ func TestMessageState(t *testing.T) { Annotations: amqp.Annotations{ messageStateAnnotation: td.PropValue, }, - }, "receiving_link") + }, receiver) require.EqualValues(t, td.Expected, m.State) }) } @@ -187,25 +202,29 @@ func TestMessageState(t *testing.T) { t.Run("NoAnnotations", func(t *testing.T) { m := newReceivedMessage(&amqp.Message{ Annotations: nil, - }, "receiving_link") + }, receiver) require.EqualValues(t, MessageStateActive, m.State) }) } func TestMessageWithIncorrectBody(t *testing.T) { + ctrl := gomock.NewController(t) + receiver := mock.NewMockAMQPReceiver(ctrl) + receiver.EXPECT().LinkName().Return("receiving_link").AnyTimes() + // these are cases where the simple ReceivedMessage can't represent the AMQP message's // payload. - message := newReceivedMessage(&amqp.Message{}, "receiving_link") + message := newReceivedMessage(&amqp.Message{}, receiver) require.Nil(t, message.Body) message = newReceivedMessage(&amqp.Message{ Value: "hello", - }, "receiving_link") + }, receiver) require.Nil(t, message.Body) message = newReceivedMessage(&amqp.Message{ Sequence: [][]any{}, - }, "receiving_link") + }, receiver) require.Nil(t, message.Body) message = newReceivedMessage(&amqp.Message{ @@ -213,7 +232,7 @@ func TestMessageWithIncorrectBody(t *testing.T) { []byte("hello"), []byte("world"), }, - }, "receiving_link") + }, receiver) require.Nil(t, message.Body) } diff --git a/sdk/messaging/azservicebus/receiver.go b/sdk/messaging/azservicebus/receiver.go index a03395e10854..86012c27cbc7 100644 --- a/sdk/messaging/azservicebus/receiver.go +++ b/sdk/messaging/azservicebus/receiver.go @@ -54,7 +54,7 @@ type Receiver struct { receiveMode ReceiveMode receiving bool retryOptions RetryOptions - settler settler + settler *messageSettler } // ReceiverOptions contains options for the `Client.NewReceiverForQueue` or `Client.NewReceiverForSubscription` @@ -225,8 +225,8 @@ func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers } for _, amqpMsg := range amqpMessages { - receivedMsg := newReceivedMessage(amqpMsg, lwid.Receiver.LinkName()) - receivedMsg.deferred = true + receivedMsg := newReceivedMessage(amqpMsg, lwid.Receiver) + receivedMsg.settleOnMgmtLink = true receivedMessages = append(receivedMessages, receivedMsg) } @@ -277,7 +277,7 @@ func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, option receivedMessages = make([]*ReceivedMessage, len(messages)) for i := 0; i < len(messages); i++ { - receivedMessages[i] = newReceivedMessage(messages[i], links.Receiver.LinkName()) + receivedMessages[i] = newReceivedMessage(messages[i], links.Receiver) } if len(receivedMessages) > 0 && updateInternalSequenceNumber { @@ -300,7 +300,7 @@ type RenewMessageLockOptions struct { // If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable. func (r *Receiver) RenewMessageLock(ctx context.Context, msg *ReceivedMessage, options *RenewMessageLockOptions) error { err := r.amqpLinks.Retry(ctx, EventReceiver, "renewMessageLock", func(ctx context.Context, linksWithVersion *internal.LinksWithID, args *utils.RetryFnArgs) error { - newExpirationTime, err := internal.RenewLocks(ctx, linksWithVersion.RPC, msg.RawAMQPMessage.linkName, []amqp.UUID{ + newExpirationTime, err := internal.RenewLocks(ctx, linksWithVersion.RPC, msg.linkName, []amqp.UUID{ (amqp.UUID)(msg.LockToken), }) @@ -332,7 +332,7 @@ func (r *Receiver) CompleteMessage(ctx context.Context, message *ReceivedMessage return r.settler.CompleteMessage(ctx, message, options) } -// AbandonMessage will cause a message to be available again from the queue or subscription. +// AbandonMessage will cause a message to be available again from the queue or subscription. // This will increment its delivery count, and potentially cause it to be dead-lettered // depending on your queue or subscription's configuration. // This function can only be used when the Receiver has been opened with `ReceiveModePeekLock`. @@ -445,7 +445,7 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt var receivedMessages []*ReceivedMessage for _, msg := range result.Messages { - receivedMessages = append(receivedMessages, newReceivedMessage(msg, linksWithID.Receiver.LinkName())) + receivedMessages = append(receivedMessages, newReceivedMessage(msg, linksWithID.Receiver)) } return receivedMessages, nil diff --git a/sdk/messaging/azservicebus/receiver_simulated_test.go b/sdk/messaging/azservicebus/receiver_simulated_test.go index b2444e3cb22a..44712d134a9d 100644 --- a/sdk/messaging/azservicebus/receiver_simulated_test.go +++ b/sdk/messaging/azservicebus/receiver_simulated_test.go @@ -361,8 +361,6 @@ func TestReceiver_UserFacingErrors(t *testing.T) { messages, err = receiver.ReceiveMessages(context.Background(), 1, nil) require.NoError(t, err) require.Empty(t, messages) - // require.ErrorAs(t, err, &asSBError) - // require.Equal(t, CodeConnectionLost, asSBError.Code) receiveErr = internal.RPCError{Resp: &amqpwrap.RPCResponse{Code: internal.RPCResponseCodeLockLost}} @@ -372,8 +370,10 @@ func TestReceiver_UserFacingErrors(t *testing.T) { msg := &ReceivedMessage{ LockToken: id, RawAMQPMessage: &AMQPAnnotatedMessage{ - linkName: "linkName", + inner: &amqp.Message{}, }, + linkName: "link-name", + settleOnMgmtLink: true, } err = receiver.AbandonMessage(context.Background(), msg, nil) diff --git a/sdk/messaging/azservicebus/receiver_test.go b/sdk/messaging/azservicebus/receiver_test.go index ac1870ec31ca..89425e19601e 100644 --- a/sdk/messaging/azservicebus/receiver_test.go +++ b/sdk/messaging/azservicebus/receiver_test.go @@ -308,7 +308,7 @@ func TestReceiverDeferAndReceiveDeferredMessages(t *testing.T) { require.NoError(t, err) require.EqualValues(t, []string{"deferring a message"}, getSortedBodies(deferredMessages)) - require.True(t, deferredMessages[0].deferred, "internal flag indicating it was from a deferred receiver method is set") + require.True(t, deferredMessages[0].settleOnMgmtLink, "deferred messages should always settle on the management link") for _, m := range deferredMessages { err = receiver.CompleteMessage(ctx, m, nil) diff --git a/sdk/messaging/azservicebus/sender_test.go b/sdk/messaging/azservicebus/sender_test.go index 3f74e3a77a7d..6adc8b37b8cc 100644 --- a/sdk/messaging/azservicebus/sender_test.go +++ b/sdk/messaging/azservicebus/sender_test.go @@ -550,8 +550,9 @@ func TestSender_SendAMQPMessageWithMultipleByteSlicesInData(t *testing.T) { require.Nil(t, m.Body) require.NotEmpty(t, m.RawAMQPMessage.DeliveryTag) + require.NoError(t, receiver.CompleteMessage(context.Background(), messages[0], nil)) + // kill some fields that aren't important for this comparison - m.RawAMQPMessage.linkName = "" m.RawAMQPMessage.inner = nil require.Equal(t, &AMQPAnnotatedMessage{ @@ -586,8 +587,6 @@ func TestSender_SendAMQPMessageWithMultipleByteSlicesInData(t *testing.T) { // as we're not trying to test those necessarily DeliveryTag: m.RawAMQPMessage.DeliveryTag, }, messages[0].RawAMQPMessage) - - require.NoError(t, receiver.CompleteMessage(context.Background(), messages[0], nil)) } func TestSender_SendAMQPMessageWithValue(t *testing.T) { From 6d47b47ca6c0237b4a784c93606ff3a768adc360 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Tue, 13 Feb 2024 00:21:26 +0000 Subject: [PATCH 2/3] Make it clear - the fallthrough is intentional. --- sdk/messaging/azservicebus/messageSettler.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sdk/messaging/azservicebus/messageSettler.go b/sdk/messaging/azservicebus/messageSettler.go index e246b823c504..198fddaf3a64 100644 --- a/sdk/messaging/azservicebus/messageSettler.go +++ b/sdk/messaging/azservicebus/messageSettler.go @@ -60,6 +60,9 @@ func (ms *messageSettler) CompleteMessage(ctx context.Context, message *Received if shouldSettleOnReceiver(message) { ms.notifySettleOnLink(message) err = receiver.AcceptMessage(ctx, message.RawAMQPMessage.inner) + + // NOTE: we're intentionally falling through. If we failed to settle + // we might be able to attempt to settle against the management link. } if shouldSettleOnMgmtLink(err, message) { @@ -99,6 +102,9 @@ func (ms *messageSettler) AbandonMessage(ctx context.Context, message *ReceivedM UndeliverableHere: false, Annotations: annotations, }) + + // NOTE: we're intentionally falling through. If we failed to settle + // we might be able to attempt to settle against the management link. } if shouldSettleOnMgmtLink(err, message) { @@ -148,6 +154,9 @@ func (ms *messageSettler) DeferMessage(ctx context.Context, message *ReceivedMes UndeliverableHere: true, Annotations: annotations, }) + + // NOTE: we're intentionally falling through. If we failed to settle + // we might be able to attempt to settle against the management link. } if shouldSettleOnMgmtLink(err, message) { @@ -223,6 +232,9 @@ func (ms *messageSettler) DeadLetterMessage(ctx context.Context, message *Receiv } err = receiver.RejectMessage(ctx, message.RawAMQPMessage.inner, &amqpErr) + + // NOTE: we're intentionally falling through. If we failed to settle + // we might be able to attempt to settle against the management link. } if shouldSettleOnMgmtLink(err, message) { @@ -266,11 +278,16 @@ func newAnnotations(propertiesToModify map[string]any) amqp.Annotations { return annotations } +// shouldSettleOnReceiver determines if a message can be settled on an AMQP +// link or should only be settled on the management link. func shouldSettleOnReceiver(message *ReceivedMessage) bool { // deferred messages always go through the management link return !message.settleOnMgmtLink } +// shouldSettleOnMgmtLink checks if we can fallback to settling on the management +// link (if `err` was a connection/link failure) or if the message always needs +// to be settled on the management link, like with deferred messages. func shouldSettleOnMgmtLink(settlementErr error, message *ReceivedMessage) bool { if message.settleOnMgmtLink { // deferred messages always go through the management link From 55cfb32b96b99ff4a6c4df3b226783f1b081bf37 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Tue, 13 Feb 2024 00:59:23 +0000 Subject: [PATCH 3/3] Benchmarks shouldn't run during CI. --- .../azservicebus/internal/stress/tests/benchmarks/main_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/messaging/azservicebus/internal/stress/tests/benchmarks/main_test.go b/sdk/messaging/azservicebus/internal/stress/tests/benchmarks/main_test.go index 8dd88b60f80c..e802d06f0074 100644 --- a/sdk/messaging/azservicebus/internal/stress/tests/benchmarks/main_test.go +++ b/sdk/messaging/azservicebus/internal/stress/tests/benchmarks/main_test.go @@ -19,7 +19,8 @@ func TestMain(m *testing.M) { err := shared.LoadEnvironment() if err != nil { - log.Fatal(err) + log.Printf("Failed to load env file, benchmarks will not run: %s", err) + return } os.Exit(m.Run())