From 4efb39a75a4451da5772fa350401731504543e92 Mon Sep 17 00:00:00 2001 From: ksaiki Date: Tue, 7 May 2024 21:46:19 +0900 Subject: [PATCH] Fix MessageAttributes propagation --- app/gosqs/message_attributes.go | 38 ----------- app/gosqs/send_message.go | 36 ++++++++++- app/models/models.go | 22 +++---- smoke_tests/sqs_send_message_test.go | 95 ++++++++++++++++++++++++++++ 4 files changed, 139 insertions(+), 52 deletions(-) diff --git a/app/gosqs/message_attributes.go b/app/gosqs/message_attributes.go index fd78a1e4..1badeb7e 100644 --- a/app/gosqs/message_attributes.go +++ b/app/gosqs/message_attributes.go @@ -1,47 +1,9 @@ package gosqs import ( - "fmt" - "net/http" - "github.com/Admiral-Piett/goaws/app" - log "github.com/sirupsen/logrus" ) -func extractMessageAttributes(req *http.Request, prefix string) map[string]app.MessageAttributeValue { - attributes := make(map[string]app.MessageAttributeValue) - if prefix != "" { - prefix += "." - } - - for i := 1; true; i++ { - name := req.FormValue(fmt.Sprintf("%sMessageAttribute.%d.Name", prefix, i)) - if name == "" { - break - } - - dataType := req.FormValue(fmt.Sprintf("%sMessageAttribute.%d.Value.DataType", prefix, i)) - if dataType == "" { - log.Warnf("DataType of MessageAttribute %s is missing, MD5 checksum will most probably be wrong!\n", name) - continue - } - - // StringListValue and BinaryListValue is currently not implemented - for _, valueKey := range [...]string{"StringValue", "BinaryValue"} { - value := req.FormValue(fmt.Sprintf("%sMessageAttribute.%d.Value.%s", prefix, i, valueKey)) - if value != "" { - attributes[name] = app.MessageAttributeValue{name, dataType, value, valueKey} - } - } - - if _, ok := attributes[name]; !ok { - log.Warnf("StringValue or BinaryValue of MessageAttribute %s is missing, MD5 checksum will most probably be wrong!\n", name) - } - } - - return attributes -} - func getMessageAttributeResult(a *app.MessageAttributeValue) *app.ResultMessageAttribute { v := &app.ResultMessageAttributeValue{ DataType: a.DataType, diff --git a/app/gosqs/send_message.go b/app/gosqs/send_message.go index fa0edf12..74d0a248 100644 --- a/app/gosqs/send_message.go +++ b/app/gosqs/send_message.go @@ -28,7 +28,7 @@ func SendMessageV1(req *http.Request) (int, interfaces.AbstractResponseBody) { messageBody := requestBody.MessageBody messageGroupID := requestBody.MessageGroupId messageDeduplicationID := requestBody.MessageDeduplicationId - messageAttributes := extractMessageAttributes(req, "") + messageAttributes := requestBody.MessageAttributes queueUrl := getQueueFromPath(requestBody.QueueUrl, req.URL.String()) @@ -60,8 +60,9 @@ func SendMessageV1(req *http.Request) (int, interfaces.AbstractResponseBody) { log.Println("Putting Message in Queue:", queueName) msg := app.Message{MessageBody: []byte(messageBody)} if len(messageAttributes) > 0 { - msg.MessageAttributes = messageAttributes - msg.MD5OfMessageAttributes = common.HashAttributes(messageAttributes) + oldStyleMessageAttributes := convertToOldMessageAttributeValueStructure(messageAttributes) + msg.MessageAttributes = oldStyleMessageAttributes + msg.MD5OfMessageAttributes = common.HashAttributes(oldStyleMessageAttributes) } msg.MD5OfMessageBody = common.GetMD5Hash(messageBody) msg.Uuid, _ = common.NewUUID() @@ -101,3 +102,32 @@ func SendMessageV1(req *http.Request) (int, interfaces.AbstractResponseBody) { return http.StatusOK, respStruct } + +// TODO: +// Refactor internal MessageAttribute model between SendMessage and ReceiveMessage +// from app.MessageAttributeValue(old) to models.MessageAttributeValue(new) and remove this temporary function. +func convertToOldMessageAttributeValueStructure(newValues map[string]models.MessageAttributeValue) map[string]app.MessageAttributeValue { + attributes := make(map[string]app.MessageAttributeValue) + + for name, entry := range newValues { + // StringListValue and BinaryListValue is currently not implemented + // Please refer app/gosqs/message_attributes.go + value := "" + valueKey := "" + if entry.StringValue != "" { + value = entry.StringValue + valueKey = "StringValue" + } else if entry.BinaryValue != "" { + value = entry.BinaryValue + valueKey = "BinaryValue" + } + attributes[name] = app.MessageAttributeValue{ + Name: name, + DataType: entry.DataType, + Value: value, + ValueKey: valueKey, + } + } + + return attributes +} diff --git a/app/models/models.go b/app/models/models.go index 2f0235ee..c53ec346 100644 --- a/app/models/models.go +++ b/app/models/models.go @@ -157,21 +157,21 @@ func (r *ListQueueRequest) SetAttributesFromForm(values url.Values) { func NewSendMessageRequest() *SendMessageRequest { return &SendMessageRequest{ - MessageAttributes: make(map[string]MessageAttributes), - MessageSystemAttributes: make(map[string]MessageAttributes), + MessageAttributes: make(map[string]MessageAttributeValue), + MessageSystemAttributes: make(map[string]MessageAttributeValue), } } type SendMessageRequest struct { - DelaySeconds int `json:"Del1aySeconds" schema:"DelaySeconds"` - MessageAttributes map[string]MessageAttributes `json:"MessageAttributes" schema:"MessageAttributes"` - MessageBody string `json:"MessageBody" schema:"MessageBody"` - MessageDeduplicationId string `json:"MessageDeduplicationId" schema:"MessageDeduplicationId"` - MessageGroupId string `json:"MessageGroupId" schema:"MessageGroupId"` - MessageSystemAttributes map[string]MessageAttributes `json:"MessageSystemAttributes" schema:"MessageSystemAttributes"` - QueueUrl string `json:"QueueUrl" schema:"QueueUrl"` + DelaySeconds int `json:"Del1aySeconds" schema:"DelaySeconds"` + MessageAttributes map[string]MessageAttributeValue `json:"MessageAttributes" schema:"MessageAttributes"` + MessageBody string `json:"MessageBody" schema:"MessageBody"` + MessageDeduplicationId string `json:"MessageDeduplicationId" schema:"MessageDeduplicationId"` + MessageGroupId string `json:"MessageGroupId" schema:"MessageGroupId"` + MessageSystemAttributes map[string]MessageAttributeValue `json:"MessageSystemAttributes" schema:"MessageSystemAttributes"` + QueueUrl string `json:"QueueUrl" schema:"QueueUrl"` } -type MessageAttributes struct { +type MessageAttributeValue struct { BinaryListValues []string `json:"BinaryListValues"` // goaws does not supported yet BinaryValue string `json:"BinaryValue"` DataType string `json:"DataType"` @@ -197,7 +197,7 @@ func (r *SendMessageRequest) SetAttributesFromForm(values url.Values) { stringValue := values.Get(fmt.Sprintf("MessageAttribute.%d.Value.StringValue", i)) binaryValue := values.Get(fmt.Sprintf("MessageAttribute.%d.Value.BinaryValue", i)) - r.MessageAttributes[name] = MessageAttributes{ + r.MessageAttributes[name] = MessageAttributeValue{ DataType: dataType, StringValue: stringValue, BinaryValue: binaryValue, diff --git a/smoke_tests/sqs_send_message_test.go b/smoke_tests/sqs_send_message_test.go index 02548201..58610bed 100644 --- a/smoke_tests/sqs_send_message_test.go +++ b/smoke_tests/sqs_send_message_test.go @@ -12,6 +12,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sqs" + sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/gavv/httpexpect/v2" "github.com/stretchr/testify/assert" ) @@ -79,3 +80,97 @@ func Test_SendMessageV1_json_no_attributes(t *testing.T) { } } } + +func Test_SendMessageV1_json_with_attributes(t *testing.T) { + server := generateServer() + defer func() { + server.Close() + utils.ResetResources() + }() + + sdkConfig, _ := config.LoadDefaultConfig(context.TODO()) + sdkConfig.BaseEndpoint = aws.String(server.URL) + sqsClient := sqs.NewFromConfig(sdkConfig) + sdkResponse, _ := sqsClient.CreateQueue(context.TODO(), &sqs.CreateQueueInput{ + QueueName: &af.QueueName, + }) + targetQueueUrl := sdkResponse.QueueUrl + + // Target test: send a message + targetMessageBody := "Test_SendMessageV1_json_with_attributes" + attr1_dataType := "String" + attr1_value := "attr1_value" + attr2_dataType := "String" + attr2_value := "attr1_value" + sendMessageOutput, _ := sqsClient.SendMessage(context.TODO(), &sqs.SendMessageInput{ + QueueUrl: targetQueueUrl, + MessageBody: &targetMessageBody, + MessageAttributes: map[string]sqstypes.MessageAttributeValue{ + "attr1": { + DataType: &attr1_dataType, + StringValue: &attr1_value, + }, + "attr2": { + DataType: &attr2_dataType, + StringValue: &attr2_value, + }, + }, + }) + assert.NotNil(t, sendMessageOutput.MessageId) + + // Assert 1 message in the queue + getQueueAttributesBodyXML := struct { + Action string `xml:"Action"` + Version string `xml:"Version"` + Attribute1 string `xml:"AttributeName.1"` + QueueUrl string `xml:"QueueUrl"` + }{ + Action: "GetQueueAttributes", + Version: "2012-11-05", + Attribute1: "All", + QueueUrl: *targetQueueUrl, + } + e := httpexpect.Default(t, server.URL) + r := e.POST("/"). + WithForm(getQueueAttributesBodyXML). + Expect(). + Status(http.StatusOK). + Body().Raw() + r2 := app.GetQueueAttributesResponse{} + xml.Unmarshal([]byte(r), &r2) + for _, attr := range r2.Result.Attrs { + if attr.Name == "ApproximateNumberOfMessages" { + assert.Equal(t, "1", attr.Value) + } + } + + // Receive message and check attribute + receiveMessageBodyXML := struct { + Action string `xml:"Action"` + Version string `xml:"Version"` + Attribute1 string `xml:"AttributeName.1"` + QueueUrl string `xml:"QueueUrl"` + }{ + Action: "ReceiveMessage", + Version: "2012-11-05", + QueueUrl: *targetQueueUrl, + } + r = e.POST("/"). + WithForm(receiveMessageBodyXML). + Expect(). + Status(http.StatusOK). + Body().Raw() + r3 := app.ReceiveMessageResponse{} + xml.Unmarshal([]byte(r), &r3) + message := r3.Result.Message[0] + assert.Equal(t, targetMessageBody, string(message.Body)) + assert.Equal(t, 2, len(message.MessageAttributes)) +} + +func Test_SendMessageV1_json_MaximumMessageSize_TooBig(t *testing.T) { + // TODO +} + +func Test_SendMessageV1_json_QueueNotExistant(t *testing.T) { + // TODO +}