Skip to content

Commit

Permalink
Update to handle use *amqp.DetachError instead of ErrLinkDetached (go…
Browse files Browse the repository at this point in the history
…-amqp change) (#245)

Updating to the latest go-amqp and azure-amqp-common-go

* go-amqp will return *amqp.DetachError now instead of ErrLinkDetached (which is being removed).
* Now that we can set fields to nil they can be properly omitted.

Misc: Fixing a unit test that was broken.
  • Loading branch information
richardpark-msft authored Dec 8, 2021
1 parent bb122ca commit 73b7c0f
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 23 deletions.
2 changes: 1 addition & 1 deletion batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestEventBatch_Clear(t *testing.T) {
ok, err := eb.Add(eventhub.NewEventFromString("Foo"))
assert.True(t, ok)
assert.NoError(t, err)
assert.Equal(t, 174, eb.Size())
assert.Equal(t, 163, eb.Size())

eb.Clear()
assert.Equal(t, 100, eb.Size())
Expand Down
16 changes: 13 additions & 3 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,20 @@ func newEvent(data []byte, msg *amqp.Message) (*Event, error) {
}

event.RawAMQPMessage.Properties.UserID = msg.Properties.UserID
event.RawAMQPMessage.Properties.Subject = msg.Properties.Subject

if msg.Properties.Subject != nil {
event.RawAMQPMessage.Properties.Subject = *msg.Properties.Subject
}

event.RawAMQPMessage.Properties.CorrelationID = msg.Properties.CorrelationID
event.RawAMQPMessage.Properties.ContentEncoding = msg.Properties.ContentEncoding
event.RawAMQPMessage.Properties.ContentType = msg.Properties.ContentType

if msg.Properties.ContentEncoding != nil {
event.RawAMQPMessage.Properties.ContentEncoding = *msg.Properties.ContentEncoding
}

if msg.Properties.ContentType != nil {
event.RawAMQPMessage.Properties.ContentType = *msg.Properties.ContentType
}
}

if msg.Annotations != nil {
Expand Down
10 changes: 7 additions & 3 deletions event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@ import (
// SOFTWARE

func TestMessageConversion(t *testing.T) {
subject := "subject"
contentEncoding := "utf-75"
contentType := "application/octet-stream"

amqpMsg := &amqp.Message{
Properties: &amqp.MessageProperties{
MessageID: "messageID",
UserID: []byte("userID"),
CorrelationID: "correlationID",
Subject: "subject",
ContentEncoding: "utf-75",
ContentType: "application/octet-stream",
Subject: &subject,
ContentEncoding: &contentEncoding,
ContentType: &contentType,
},
Annotations: amqp.Annotations{
"annotation1": "annotation1Value",
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ module github.com/Azure/azure-event-hubs-go/v3
go 1.13

require (
github.com/Azure/azure-amqp-common-go/v3 v3.2.1
github.com/Azure/azure-amqp-common-go/v3 v3.2.3
github.com/Azure/azure-pipeline-go v0.1.9
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible
github.com/Azure/azure-storage-blob-go v0.6.0
github.com/Azure/go-amqp v0.16.0
github.com/Azure/go-amqp v0.17.0
github.com/Azure/go-autorest/autorest v0.11.18
github.com/Azure/go-autorest/autorest/adal v0.9.13
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2
Expand Down
9 changes: 6 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
github.com/Azure/azure-amqp-common-go/v3 v3.2.1 h1:uQyDk81yn5hTP1pW4Za+zHzy97/f4vDz9o1d/exI4j4=
github.com/Azure/azure-amqp-common-go/v3 v3.2.1/go.mod h1:O6X1iYHP7s2x7NjUKsXVhkwWrQhxrd+d8/3rRadj4CI=
github.com/Azure/azure-amqp-common-go/v3 v3.2.2 h1:CJpxNAGxP7UBhDusRUoaOn0uOorQyAYhQYLnNgkRhlY=
github.com/Azure/azure-amqp-common-go/v3 v3.2.2/go.mod h1:O6X1iYHP7s2x7NjUKsXVhkwWrQhxrd+d8/3rRadj4CI=
github.com/Azure/azure-amqp-common-go/v3 v3.2.3 h1:uDF62mbd9bypXWi19V1bN5NZEO84JqgmI5G73ibAmrk=
github.com/Azure/azure-amqp-common-go/v3 v3.2.3/go.mod h1:7rPmbSfszeovxGfc5fSAXE4ehlXQZHpMja2OtxC2Tas=
github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
github.com/Azure/azure-pipeline-go v0.1.9 h1:u7JFb9fFTE6Y/j8ae2VK33ePrRqJqoCM/IWkQdAZ+rg=
github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnzwb0ml1Qn70AdtRccZ543w=
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-storage-blob-go v0.6.0 h1:SEATKb3LIHcaSIX+E6/K4kJpwfuozFEsmt5rS56N6CE=
github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
github.com/Azure/go-amqp v0.16.0 h1:6mhxUxaKLjMtHlGqzeih/LKqjUPLZxbM6zwfz5/C4NQ=
github.com/Azure/go-amqp v0.16.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-amqp v0.17.0 h1:HHXa3149nKrI0IZwyM7DRcRy5810t9ZICDutn4BYzj4=
github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
Expand Down
4 changes: 3 additions & 1 deletion hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package eventhub
import (
"context"
"encoding/xml"
"errors"
"fmt"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -779,7 +780,8 @@ func (h *Hub) getSender(ctx context.Context) (*sender, error) {
}

func isRecoverableCloseError(err error) bool {
return isConnectionClosed(err) || isSessionClosed(err) || err == amqp.ErrLinkDetached
var detachError *amqp.DetachError
return isConnectionClosed(err) || isSessionClosed(err) || errors.As(err, &detachError)
}

func isConnectionClosed(err error) bool {
Expand Down
2 changes: 1 addition & 1 deletion hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ func TestNewHub_withAzureEnvironmentVariable(t *testing.T) {
}

func TestIsRecoverableCloseError(t *testing.T) {
require.True(t, isRecoverableCloseError(amqp.ErrLinkDetached))
require.True(t, isRecoverableCloseError(&amqp.DetachError{}))

// if the caller closes a link we shouldn't reopen or create a new one to replace it
require.False(t, isRecoverableCloseError(amqp.ErrLinkClosed))
Expand Down
26 changes: 17 additions & 9 deletions sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestSenderRetries(t *testing.T) {
recoverCalls = nil
sender = &testAmqpSender{
sendErrors: []error{
amqp.ErrLinkDetached,
&amqp.DetachError{},
amqp.ErrSessionClosed,
errors.New("We'll never attempt to use this one since we ran out of retries")},
}
Expand All @@ -85,7 +85,7 @@ func TestSenderRetries(t *testing.T) {
assert.EqualValues(t, []recoveryCall{
{
linkID: "sender-id",
err: amqp.ErrLinkDetached,
err: &amqp.DetachError{},
recover: true,
},
{
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestSenderRetries(t *testing.T) {
sender = &testAmqpSender{
sendErrors: []error{
amqp.ErrConnClosed,
amqp.ErrLinkDetached,
&amqp.DetachError{},
amqp.ErrSessionClosed,
},
}
Expand All @@ -219,7 +219,7 @@ func TestSenderRetries(t *testing.T) {
},
{
linkID: "sender-id",
err: amqp.ErrLinkDetached,
err: &amqp.DetachError{},
recover: true,
},
{
Expand Down Expand Up @@ -312,17 +312,19 @@ func TestRecoveryBlock1(t *testing.T) {

defer cleanup()

sender.recoverWithExpectedLinkID(context.TODO(), "")
err := sender.recoverWithExpectedLinkID(context.TODO(), "")
require.NoError(t, err)
})

t.Run("Matching link ID does recovery", func(t *testing.T) {
cleanup, sender := createRecoveryBlock1Sender(t, func(s *sender) {
require.True(t, s.recovering, "s.recovering should be true since the lock is available and we have our expected link ID matches")
require.True(t, s.recovering, "s.recovering should be true since the lock is available and we our expected link ID matches")
})

defer cleanup()

sender.recoverWithExpectedLinkID(context.TODO(), "the-actual-link-id")
err := sender.recoverWithExpectedLinkID(context.TODO(), "the-actual-link-id")
require.NoError(t, err)
})

t.Run("Non-matching link ID skips recovery", func(t *testing.T) {
Expand All @@ -332,7 +334,8 @@ func TestRecoveryBlock1(t *testing.T) {

defer cleanup()

sender.recoverWithExpectedLinkID(context.TODO(), "non-matching-link-id")
err := sender.recoverWithExpectedLinkID(context.TODO(), "non-matching-link-id")
require.NoError(t, err)
})

// TODO: can't quite test this one
Expand Down Expand Up @@ -361,6 +364,10 @@ func (s *fakeSender) ID() string {
return s.id
}

func (s *fakeSender) LinkName() string {
return "the-actual-link-id"
}

func (s *fakeSender) Send(ctx context.Context, msg *amqp.Message) error {
return nil
}
Expand Down Expand Up @@ -391,6 +398,7 @@ func createRecoveryBlock1Sender(t *testing.T, afterBlock1 func(s *sender)) (func
}}

return func() {
require.EqualValues(t, recover(), "Panicking to exit before block 2")
val := recover()
require.EqualValues(t, "Panicking to exit before block 2", val)
}, s
}

0 comments on commit 73b7c0f

Please sign in to comment.