diff --git a/batch_test.go b/batch_test.go index 7e84e328..d4d2bc37 100644 --- a/batch_test.go +++ b/batch_test.go @@ -49,7 +49,7 @@ func TestEventBatch_Clear(t *testing.T) { ok, err := eb.Add(eventhub.NewEventFromString("Foo")) assert.True(t, ok) assert.NoError(t, err) - assert.Equal(t, 174, eb.Size()) + assert.Equal(t, 163, eb.Size()) eb.Clear() assert.Equal(t, 100, eb.Size()) diff --git a/event.go b/event.go index b7c6ab0c..6722792c 100644 --- a/event.go +++ b/event.go @@ -229,10 +229,20 @@ func newEvent(data []byte, msg *amqp.Message) (*Event, error) { } event.RawAMQPMessage.Properties.UserID = msg.Properties.UserID - event.RawAMQPMessage.Properties.Subject = msg.Properties.Subject + + if msg.Properties.Subject != nil { + event.RawAMQPMessage.Properties.Subject = *msg.Properties.Subject + } + event.RawAMQPMessage.Properties.CorrelationID = msg.Properties.CorrelationID - event.RawAMQPMessage.Properties.ContentEncoding = msg.Properties.ContentEncoding - event.RawAMQPMessage.Properties.ContentType = msg.Properties.ContentType + + if msg.Properties.ContentEncoding != nil { + event.RawAMQPMessage.Properties.ContentEncoding = *msg.Properties.ContentEncoding + } + + if msg.Properties.ContentType != nil { + event.RawAMQPMessage.Properties.ContentType = *msg.Properties.ContentType + } } if msg.Annotations != nil { diff --git a/event_test.go b/event_test.go index 1beeeb91..f0a01dbd 100644 --- a/event_test.go +++ b/event_test.go @@ -30,14 +30,18 @@ import ( // SOFTWARE func TestMessageConversion(t *testing.T) { + subject := "subject" + contentEncoding := "utf-75" + contentType := "application/octet-stream" + amqpMsg := &amqp.Message{ Properties: &amqp.MessageProperties{ MessageID: "messageID", UserID: []byte("userID"), CorrelationID: "correlationID", - Subject: "subject", - ContentEncoding: "utf-75", - ContentType: "application/octet-stream", + Subject: &subject, + ContentEncoding: &contentEncoding, + ContentType: &contentType, }, Annotations: amqp.Annotations{ "annotation1": "annotation1Value", diff --git a/go.mod b/go.mod index a6d423f9..f753f599 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,11 @@ module github.com/Azure/azure-event-hubs-go/v3 go 1.13 require ( - github.com/Azure/azure-amqp-common-go/v3 v3.2.1 + github.com/Azure/azure-amqp-common-go/v3 v3.2.3 github.com/Azure/azure-pipeline-go v0.1.9 github.com/Azure/azure-sdk-for-go v51.1.0+incompatible github.com/Azure/azure-storage-blob-go v0.6.0 - github.com/Azure/go-amqp v0.16.0 + github.com/Azure/go-amqp v0.17.0 github.com/Azure/go-autorest/autorest v0.11.18 github.com/Azure/go-autorest/autorest/adal v0.9.13 github.com/Azure/go-autorest/autorest/azure/auth v0.4.2 diff --git a/go.sum b/go.sum index 98457228..395f3f3e 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ -github.com/Azure/azure-amqp-common-go/v3 v3.2.1 h1:uQyDk81yn5hTP1pW4Za+zHzy97/f4vDz9o1d/exI4j4= -github.com/Azure/azure-amqp-common-go/v3 v3.2.1/go.mod h1:O6X1iYHP7s2x7NjUKsXVhkwWrQhxrd+d8/3rRadj4CI= +github.com/Azure/azure-amqp-common-go/v3 v3.2.2 h1:CJpxNAGxP7UBhDusRUoaOn0uOorQyAYhQYLnNgkRhlY= +github.com/Azure/azure-amqp-common-go/v3 v3.2.2/go.mod h1:O6X1iYHP7s2x7NjUKsXVhkwWrQhxrd+d8/3rRadj4CI= +github.com/Azure/azure-amqp-common-go/v3 v3.2.3 h1:uDF62mbd9bypXWi19V1bN5NZEO84JqgmI5G73ibAmrk= +github.com/Azure/azure-amqp-common-go/v3 v3.2.3/go.mod h1:7rPmbSfszeovxGfc5fSAXE4ehlXQZHpMja2OtxC2Tas= github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg= github.com/Azure/azure-pipeline-go v0.1.9 h1:u7JFb9fFTE6Y/j8ae2VK33ePrRqJqoCM/IWkQdAZ+rg= github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg= @@ -7,8 +9,9 @@ github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnz github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-storage-blob-go v0.6.0 h1:SEATKb3LIHcaSIX+E6/K4kJpwfuozFEsmt5rS56N6CE= github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y= -github.com/Azure/go-amqp v0.16.0 h1:6mhxUxaKLjMtHlGqzeih/LKqjUPLZxbM6zwfz5/C4NQ= github.com/Azure/go-amqp v0.16.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg= +github.com/Azure/go-amqp v0.17.0 h1:HHXa3149nKrI0IZwyM7DRcRy5810t9ZICDutn4BYzj4= +github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI= diff --git a/hub.go b/hub.go index 5c8ae5f0..b38e2338 100644 --- a/hub.go +++ b/hub.go @@ -26,6 +26,7 @@ package eventhub import ( "context" "encoding/xml" + "errors" "fmt" "io/ioutil" "net/http" @@ -779,7 +780,8 @@ func (h *Hub) getSender(ctx context.Context) (*sender, error) { } func isRecoverableCloseError(err error) bool { - return isConnectionClosed(err) || isSessionClosed(err) || err == amqp.ErrLinkDetached + var detachError *amqp.DetachError + return isConnectionClosed(err) || isSessionClosed(err) || errors.As(err, &detachError) } func isConnectionClosed(err error) bool { diff --git a/hub_test.go b/hub_test.go index 68110d56..04b37f03 100644 --- a/hub_test.go +++ b/hub_test.go @@ -768,7 +768,7 @@ func TestNewHub_withAzureEnvironmentVariable(t *testing.T) { } func TestIsRecoverableCloseError(t *testing.T) { - require.True(t, isRecoverableCloseError(amqp.ErrLinkDetached)) + require.True(t, isRecoverableCloseError(&amqp.DetachError{})) // if the caller closes a link we shouldn't reopen or create a new one to replace it require.False(t, isRecoverableCloseError(amqp.ErrLinkClosed)) diff --git a/sender_test.go b/sender_test.go index d0c77263..64ef9a02 100644 --- a/sender_test.go +++ b/sender_test.go @@ -71,7 +71,7 @@ func TestSenderRetries(t *testing.T) { recoverCalls = nil sender = &testAmqpSender{ sendErrors: []error{ - amqp.ErrLinkDetached, + &amqp.DetachError{}, amqp.ErrSessionClosed, errors.New("We'll never attempt to use this one since we ran out of retries")}, } @@ -85,7 +85,7 @@ func TestSenderRetries(t *testing.T) { assert.EqualValues(t, []recoveryCall{ { linkID: "sender-id", - err: amqp.ErrLinkDetached, + err: &amqp.DetachError{}, recover: true, }, { @@ -203,7 +203,7 @@ func TestSenderRetries(t *testing.T) { sender = &testAmqpSender{ sendErrors: []error{ amqp.ErrConnClosed, - amqp.ErrLinkDetached, + &amqp.DetachError{}, amqp.ErrSessionClosed, }, } @@ -219,7 +219,7 @@ func TestSenderRetries(t *testing.T) { }, { linkID: "sender-id", - err: amqp.ErrLinkDetached, + err: &amqp.DetachError{}, recover: true, }, { @@ -312,17 +312,19 @@ func TestRecoveryBlock1(t *testing.T) { defer cleanup() - sender.recoverWithExpectedLinkID(context.TODO(), "") + err := sender.recoverWithExpectedLinkID(context.TODO(), "") + require.NoError(t, err) }) t.Run("Matching link ID does recovery", func(t *testing.T) { cleanup, sender := createRecoveryBlock1Sender(t, func(s *sender) { - require.True(t, s.recovering, "s.recovering should be true since the lock is available and we have our expected link ID matches") + require.True(t, s.recovering, "s.recovering should be true since the lock is available and we our expected link ID matches") }) defer cleanup() - sender.recoverWithExpectedLinkID(context.TODO(), "the-actual-link-id") + err := sender.recoverWithExpectedLinkID(context.TODO(), "the-actual-link-id") + require.NoError(t, err) }) t.Run("Non-matching link ID skips recovery", func(t *testing.T) { @@ -332,7 +334,8 @@ func TestRecoveryBlock1(t *testing.T) { defer cleanup() - sender.recoverWithExpectedLinkID(context.TODO(), "non-matching-link-id") + err := sender.recoverWithExpectedLinkID(context.TODO(), "non-matching-link-id") + require.NoError(t, err) }) // TODO: can't quite test this one @@ -361,6 +364,10 @@ func (s *fakeSender) ID() string { return s.id } +func (s *fakeSender) LinkName() string { + return "the-actual-link-id" +} + func (s *fakeSender) Send(ctx context.Context, msg *amqp.Message) error { return nil } @@ -391,6 +398,7 @@ func createRecoveryBlock1Sender(t *testing.T, afterBlock1 func(s *sender)) (func }} return func() { - require.EqualValues(t, recover(), "Panicking to exit before block 2") + val := recover() + require.EqualValues(t, "Panicking to exit before block 2", val) }, s }