From fcdd20863ddaf0f07af61d497a41eba7803a7af9 Mon Sep 17 00:00:00 2001 From: Devin Humphreys Date: Mon, 3 Jun 2024 16:41:31 -0400 Subject: [PATCH] Add PurgeQueueV1 for JSON support --- app/gosqs/get_queue_attributes.go | 3 +- app/gosqs/gosqs.go | 30 ------- app/gosqs/purge_queue.go | 43 +++++++++ app/gosqs/purge_queue_test.go | 125 +++++++++++++++++++++++++++ app/gosqs/send_message.go | 2 +- app/models/models.go | 10 +++ app/models/responses.go | 14 +++ app/router/router.go | 2 +- app/router/router_test.go | 2 +- app/sqs_messages.go | 6 -- smoke_tests/sqs_create_queue_test.go | 3 - smoke_tests/sqs_purge_queue_test.go | 125 +++++++++++++++++++++++++++ 12 files changed, 321 insertions(+), 44 deletions(-) create mode 100644 app/gosqs/purge_queue.go create mode 100644 app/gosqs/purge_queue_test.go create mode 100644 smoke_tests/sqs_purge_queue_test.go diff --git a/app/gosqs/get_queue_attributes.go b/app/gosqs/get_queue_attributes.go index 280c819a..32af7dff 100644 --- a/app/gosqs/get_queue_attributes.go +++ b/app/gosqs/get_queue_attributes.go @@ -61,10 +61,10 @@ func GetQueueAttributesV1(req *http.Request) (int, interfaces.AbstractResponseBo queueAttributes := make([]models.Attribute, 0, 0) app.SyncQueues.RLock() + defer app.SyncQueues.RUnlock() queue, ok := app.SyncQueues.Queues[queueName] if !ok { log.Errorf("Get Queue URL: %s queue does not exist!!!", queueName) - app.SyncQueues.RUnlock() return createErrorResponseV1(ErrInvalidParameterValue.Type) } @@ -126,7 +126,6 @@ func GetQueueAttributesV1(req *http.Request) (int, interfaces.AbstractResponseBo attr := models.Attribute{Name: "RedrivePolicy", Value: fmt.Sprintf(`{"maxReceiveCount":"%d", "deadLetterTargetArn":"%s"}`, queue.MaxReceiveCount, queue.DeadLetterQueue.Arn)} queueAttributes = append(queueAttributes, attr) } - app.SyncQueues.RUnlock() respStruct := models.GetQueueAttributesResponse{ models.BASE_XMLNS, diff --git a/app/gosqs/gosqs.go b/app/gosqs/gosqs.go index 8a678bf7..033b3179 100644 --- a/app/gosqs/gosqs.go +++ b/app/gosqs/gosqs.go @@ -362,36 +362,6 @@ func DeleteQueue(w http.ResponseWriter, req *http.Request) { } } -func PurgeQueue(w http.ResponseWriter, req *http.Request) { - // Sent response type - w.Header().Set("Content-Type", "application/xml") - - // Retrieve FormValues required - queueUrl := getQueueFromPath(req.FormValue("QueueUrl"), req.URL.String()) - - uriSegments := strings.Split(queueUrl, "/") - queueName := uriSegments[len(uriSegments)-1] - - log.Println("Purging Queue:", queueName) - - app.SyncQueues.Lock() - if _, ok := app.SyncQueues.Queues[queueName]; ok { - app.SyncQueues.Queues[queueName].Messages = nil - app.SyncQueues.Queues[queueName].Duplicates = make(map[string]time.Time) - respStruct := app.PurgeQueueResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.ResponseMetadata{RequestId: "00000000-0000-0000-0000-000000000000"}} - enc := xml.NewEncoder(w) - enc.Indent(" ", " ") - if err := enc.Encode(respStruct); err != nil { - log.Printf("error: %v\n", err) - createErrorResponse(w, req, "GeneralError") - } - } else { - log.Println("Purge Queue:", queueName, ", queue does not exist!!!") - createErrorResponse(w, req, "QueueNotFound") - } - app.SyncQueues.Unlock() -} - func GetQueueUrl(w http.ResponseWriter, req *http.Request) { // Sent response type w.Header().Set("Content-Type", "application/xml") diff --git a/app/gosqs/purge_queue.go b/app/gosqs/purge_queue.go new file mode 100644 index 00000000..b2562f4a --- /dev/null +++ b/app/gosqs/purge_queue.go @@ -0,0 +1,43 @@ +package gosqs + +import ( + "net/http" + "strings" + "time" + + "github.com/Admiral-Piett/goaws/app/interfaces" + "github.com/Admiral-Piett/goaws/app/models" + "github.com/Admiral-Piett/goaws/app/utils" + + "github.com/Admiral-Piett/goaws/app" + log "github.com/sirupsen/logrus" +) + +func PurgeQueueV1(req *http.Request) (int, interfaces.AbstractResponseBody) { + requestBody := models.NewPurgeQueueRequest() + ok := utils.REQUEST_TRANSFORMER(requestBody, req, false) + if !ok { + log.Error("Invalid Request - PurgeQueueV1") + return createErrorResponseV1(ErrInvalidParameterValue.Type) + } + + uriSegments := strings.Split(requestBody.QueueUrl, "/") + queueName := uriSegments[len(uriSegments)-1] + + app.SyncQueues.Lock() + defer app.SyncQueues.Unlock() + if _, ok := app.SyncQueues.Queues[queueName]; !ok { + log.Errorf("Purge Queue: %s, queue does not exist!!!", queueName) + return createErrorResponseV1("QueueNotFound") + } + + log.Infof("Purging Queue: %s", queueName) + app.SyncQueues.Queues[queueName].Messages = nil + app.SyncQueues.Queues[queueName].Duplicates = make(map[string]time.Time) + + respStruct := models.PurgeQueueResponse{ + Xmlns: models.BASE_XMLNS, + Metadata: models.BASE_RESPONSE_METADATA, + } + return http.StatusOK, respStruct +} diff --git a/app/gosqs/purge_queue_test.go b/app/gosqs/purge_queue_test.go new file mode 100644 index 00000000..d5a14c4d --- /dev/null +++ b/app/gosqs/purge_queue_test.go @@ -0,0 +1,125 @@ +package gosqs + +import ( + "fmt" + "net/http" + "testing" + "time" + + "github.com/Admiral-Piett/goaws/app/conf" + + "github.com/Admiral-Piett/goaws/app" + "github.com/Admiral-Piett/goaws/app/fixtures" + "github.com/Admiral-Piett/goaws/app/interfaces" + "github.com/Admiral-Piett/goaws/app/models" + "github.com/Admiral-Piett/goaws/app/utils" + "github.com/stretchr/testify/assert" +) + +func TestPurgeQueueV1_success(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + utils.ResetApp() + utils.REQUEST_TRANSFORMER = utils.TransformRequest + }() + + utils.REQUEST_TRANSFORMER = func(resultingStruct interfaces.AbstractRequestBody, req *http.Request, emptyRequestValid bool) (success bool) { + v := resultingStruct.(*models.PurgeQueueRequest) + *v = models.PurgeQueueRequest{ + QueueUrl: fmt.Sprintf("%s/%s", fixtures.BASE_URL, "unit-queue1"), + } + return true + } + + // Put a message on the queue + targetQueue := app.SyncQueues.Queues["unit-queue1"] + app.SyncQueues.Lock() + targetQueue.Messages = []app.Message{app.Message{}} + targetQueue.Duplicates = map[string]time.Time{ + "dedupe-id": time.Now(), + } + app.SyncQueues.Unlock() + + expectedResponse := models.PurgeQueueResponse{ + Xmlns: models.BASE_XMLNS, + Metadata: models.BASE_RESPONSE_METADATA, + } + + _, r := utils.GenerateRequestInfo("POST", "/", nil, true) + code, response := PurgeQueueV1(r) + + assert.Equal(t, http.StatusOK, code) + assert.Equal(t, expectedResponse, response) + + assert.Nil(t, targetQueue.Messages) + assert.Equal(t, map[string]time.Time{}, targetQueue.Duplicates) +} + +func TestPurgeQueueV1_success_no_messages_on_queue(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + utils.ResetApp() + utils.REQUEST_TRANSFORMER = utils.TransformRequest + }() + + utils.REQUEST_TRANSFORMER = func(resultingStruct interfaces.AbstractRequestBody, req *http.Request, emptyRequestValid bool) (success bool) { + v := resultingStruct.(*models.PurgeQueueRequest) + *v = models.PurgeQueueRequest{ + QueueUrl: fmt.Sprintf("%s/%s", fixtures.BASE_URL, "unit-queue1"), + } + return true + } + + expectedResponse := models.PurgeQueueResponse{ + Xmlns: models.BASE_XMLNS, + Metadata: models.BASE_RESPONSE_METADATA, + } + + _, r := utils.GenerateRequestInfo("POST", "/", nil, true) + code, response := PurgeQueueV1(r) + + assert.Equal(t, http.StatusOK, code) + assert.Equal(t, expectedResponse, response) + + targetQueue := app.SyncQueues.Queues["unit-queue1"] + assert.Nil(t, targetQueue.Messages) + assert.Equal(t, map[string]time.Time{}, targetQueue.Duplicates) +} + +func TestPurgeQueueV1_request_transformer_error(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + utils.ResetApp() + utils.REQUEST_TRANSFORMER = utils.TransformRequest + }() + + utils.REQUEST_TRANSFORMER = func(resultingStruct interfaces.AbstractRequestBody, req *http.Request, emptyRequestValid bool) (success bool) { + return false + } + + _, r := utils.GenerateRequestInfo("POST", "/", nil, true) + code, _ := PurgeQueueV1(r) + + assert.Equal(t, http.StatusBadRequest, code) +} + +func TestPurgeQueueV1_requested_queue_does_not_exist(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + utils.ResetApp() + utils.REQUEST_TRANSFORMER = utils.TransformRequest + }() + + utils.REQUEST_TRANSFORMER = func(resultingStruct interfaces.AbstractRequestBody, req *http.Request, emptyRequestValid bool) (success bool) { + v := resultingStruct.(*models.PurgeQueueRequest) + *v = models.PurgeQueueRequest{ + QueueUrl: fmt.Sprintf("%s/%s", fixtures.BASE_URL, "garbage"), + } + return true + } + + _, r := utils.GenerateRequestInfo("POST", "/", nil, true) + code, _ := PurgeQueueV1(r) + + assert.Equal(t, http.StatusBadRequest, code) +} diff --git a/app/gosqs/send_message.go b/app/gosqs/send_message.go index a20931e5..e362019d 100644 --- a/app/gosqs/send_message.go +++ b/app/gosqs/send_message.go @@ -21,7 +21,7 @@ func SendMessageV1(req *http.Request) (int, interfaces.AbstractResponseBody) { requestBody := models.NewSendMessageRequest() ok := utils.REQUEST_TRANSFORMER(requestBody, req, false) if !ok { - log.Error("Invalid Request - CreateQueueV1") + log.Error("Invalid Request - SendMessageV1") return createErrorResponseV1(ErrInvalidParameterValue.Type) } messageBody := requestBody.MessageBody diff --git a/app/models/models.go b/app/models/models.go index 2f4ad18a..dcba66dc 100644 --- a/app/models/models.go +++ b/app/models/models.go @@ -445,3 +445,13 @@ type DeleteMessageRequest struct { } func (r *DeleteMessageRequest) SetAttributesFromForm(values url.Values) {} + +func NewPurgeQueueRequest() *PurgeQueueRequest { + return &PurgeQueueRequest{} +} + +type PurgeQueueRequest struct { + QueueUrl string `json:"QueueUrl" schema:"QueueUrl"` +} + +func (r *PurgeQueueRequest) SetAttributesFromForm(values url.Values) {} diff --git a/app/models/responses.go b/app/models/responses.go index 38bd3e5f..a3f999d8 100644 --- a/app/models/responses.go +++ b/app/models/responses.go @@ -236,3 +236,17 @@ func (r SetQueueAttributesResponse) GetResult() interface{} { func (r SetQueueAttributesResponse) GetRequestId() string { return r.Metadata.RequestId } + +/*** Purge Queue Response */ +type PurgeQueueResponse struct { + Xmlns string `xml:"xmlns,attr,omitempty"` + Metadata app.ResponseMetadata `xml:"ResponseMetadata,omitempty"` +} + +func (r PurgeQueueResponse) GetResult() interface{} { + return nil +} + +func (r PurgeQueueResponse) GetRequestId() string { + return r.Metadata.RequestId +} diff --git a/app/router/router.go b/app/router/router.go index 4eac5bd4..ae8fc821 100644 --- a/app/router/router.go +++ b/app/router/router.go @@ -71,6 +71,7 @@ var routingTableV1 = map[string]func(r *http.Request) (int, interfaces.AbstractR "ReceiveMessage": sqs.ReceiveMessageV1, "ChangeMessageVisibility": sqs.ChangeMessageVisibilityV1, "DeleteMessage": sqs.DeleteMessageV1, + "PurgeQueue": sqs.PurgeQueueV1, } var routingTable = map[string]http.HandlerFunc{ @@ -78,7 +79,6 @@ var routingTable = map[string]http.HandlerFunc{ "SendMessageBatch": sqs.SendMessageBatch, "DeleteMessageBatch": sqs.DeleteMessageBatch, "GetQueueUrl": sqs.GetQueueUrl, - "PurgeQueue": sqs.PurgeQueue, "DeleteQueue": sqs.DeleteQueue, // SNS diff --git a/app/router/router_test.go b/app/router/router_test.go index e703bae8..e53f1956 100644 --- a/app/router/router_test.go +++ b/app/router/router_test.go @@ -268,13 +268,13 @@ func TestActionHandler_v0_xml(t *testing.T) { "ReceiveMessage": sqs.ReceiveMessageV1, "DeleteMessage": sqs.DeleteMessageV1, "ChangeMessageVisibility": sqs.ChangeMessageVisibilityV1, + "PurgeQueue": sqs.PurgeQueueV1, } routingTable = map[string]http.HandlerFunc{ // SQS "SendMessageBatch": sqs.SendMessageBatch, "DeleteMessageBatch": sqs.DeleteMessageBatch, "GetQueueUrl": sqs.GetQueueUrl, - "PurgeQueue": sqs.PurgeQueue, "DeleteQueue": sqs.DeleteQueue, // SNS diff --git a/app/sqs_messages.go b/app/sqs_messages.go index 60156d37..976bfec0 100644 --- a/app/sqs_messages.go +++ b/app/sqs_messages.go @@ -59,12 +59,6 @@ type SendMessageBatchResponse struct { Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"` } -/*** Purge Queue Response */ -type PurgeQueueResponse struct { - Xmlns string `xml:"xmlns,attr,omitempty"` - Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"` -} - /*** Get Queue Url Response */ type GetQueueUrlResult struct { QueueUrl string `xml:"QueueUrl,omitempty"` diff --git a/smoke_tests/sqs_create_queue_test.go b/smoke_tests/sqs_create_queue_test.go index 1789badc..b7c0c303 100644 --- a/smoke_tests/sqs_create_queue_test.go +++ b/smoke_tests/sqs_create_queue_test.go @@ -27,9 +27,6 @@ import ( "github.com/gavv/httpexpect/v2" ) -// TODO - Is there a way to also capture the defaults we set and/or load from the config here? (review the xml -// code below) -// NOTE: Actually I think you can just adjust the app.CurrentEnvironment memory space...it travels across tests it seems. func Test_CreateQueueV1_json_no_attributes(t *testing.T) { server := generateServer() defer func() { diff --git a/smoke_tests/sqs_purge_queue_test.go b/smoke_tests/sqs_purge_queue_test.go new file mode 100644 index 00000000..d4d6db5e --- /dev/null +++ b/smoke_tests/sqs_purge_queue_test.go @@ -0,0 +1,125 @@ +package smoke_tests + +import ( + "context" + "encoding/xml" + "fmt" + "net/http" + "testing" + "time" + + "github.com/Admiral-Piett/goaws/app/models" + "github.com/gavv/httpexpect/v2" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/sqs" + + "github.com/Admiral-Piett/goaws/app/utils" + + "github.com/Admiral-Piett/goaws/app" + "github.com/stretchr/testify/assert" + + af "github.com/Admiral-Piett/goaws/app/fixtures" +) + +func Test_PurgeQueueV1_json(t *testing.T) { + defaultEnvironment := app.CurrentEnvironment + app.CurrentEnvironment = app.Environment{ + EnableDuplicates: true, + } + server := generateServer() + defer func() { + server.Close() + utils.ResetApp() + app.CurrentEnvironment = defaultEnvironment + }() + + sdkConfig, _ := config.LoadDefaultConfig(context.TODO()) + sdkConfig.BaseEndpoint = aws.String(server.URL) + sqsClient := sqs.NewFromConfig(sdkConfig) + + qName := fmt.Sprintf("%s.fifo", af.QueueName) + sqsClient.CreateQueue(context.TODO(), &sqs.CreateQueueInput{ + QueueName: &qName, + }) + + messageBody := "test-message" + dedupeId := "dedupe-id" + sqsClient.SendMessage(context.TODO(), &sqs.SendMessageInput{ + QueueUrl: &qName, + MessageBody: &messageBody, + MessageDeduplicationId: &dedupeId, + }) + + sdkResponse, err := sqsClient.PurgeQueue(context.TODO(), &sqs.PurgeQueueInput{ + QueueUrl: &qName, + }) + + assert.Nil(t, err) + assert.NotNil(t, sdkResponse) + + app.SyncQueues.Lock() + defer app.SyncQueues.Unlock() + targetQueue := app.SyncQueues.Queues[qName] + assert.Nil(t, targetQueue.Messages) + assert.Equal(t, map[string]time.Time{}, targetQueue.Duplicates) +} + +func Test_PurgeQueueV1_xml(t *testing.T) { + defaultEnvironment := app.CurrentEnvironment + app.CurrentEnvironment = app.Environment{ + EnableDuplicates: true, + } + server := generateServer() + defer func() { + server.Close() + utils.ResetApp() + app.CurrentEnvironment = defaultEnvironment + }() + + e := httpexpect.Default(t, server.URL) + + sdkConfig, _ := config.LoadDefaultConfig(context.TODO()) + sdkConfig.BaseEndpoint = aws.String(server.URL) + sqsClient := sqs.NewFromConfig(sdkConfig) + + qName := fmt.Sprintf("%s.fifo", af.QueueName) + sdkResponse, _ := sqsClient.CreateQueue(context.TODO(), &sqs.CreateQueueInput{ + QueueName: &qName, + }) + + messageBody := "test-message" + dedupeId := "dedupe-id" + sqsClient.SendMessage(context.TODO(), &sqs.SendMessageInput{ + QueueUrl: &qName, + MessageBody: &messageBody, + MessageDeduplicationId: &dedupeId, + }) + + r := e.POST("/"). + WithForm(struct { + Action string `xml:"Action"` + QueueUrl string `xml:"QueueUrl"` + }{ + Action: "PurgeQueue", + QueueUrl: *sdkResponse.QueueUrl, + }). + Expect(). + Status(http.StatusOK). + Body().Raw() + + expected := models.PurgeQueueResponse{ + Xmlns: models.BASE_XMLNS, + Metadata: models.BASE_RESPONSE_METADATA, + } + response := models.PurgeQueueResponse{} + xml.Unmarshal([]byte(r), &response) + assert.Equal(t, expected, response) + + app.SyncQueues.Lock() + defer app.SyncQueues.Unlock() + targetQueue := app.SyncQueues.Queues[qName] + assert.Nil(t, targetQueue.Messages) + assert.Equal(t, map[string]time.Time{}, targetQueue.Duplicates) +}