Skip to content

Commit

Permalink
Add PurgeQueueV1 for JSON support
Browse files Browse the repository at this point in the history
  • Loading branch information
dhumphreys01 authored and Admiral-Piett committed Sep 20, 2024
1 parent b3d5762 commit fcdd208
Show file tree
Hide file tree
Showing 12 changed files with 321 additions and 44 deletions.
3 changes: 1 addition & 2 deletions app/gosqs/get_queue_attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ func GetQueueAttributesV1(req *http.Request) (int, interfaces.AbstractResponseBo
queueAttributes := make([]models.Attribute, 0, 0)

app.SyncQueues.RLock()
defer app.SyncQueues.RUnlock()
queue, ok := app.SyncQueues.Queues[queueName]
if !ok {
log.Errorf("Get Queue URL: %s queue does not exist!!!", queueName)
app.SyncQueues.RUnlock()
return createErrorResponseV1(ErrInvalidParameterValue.Type)
}

Expand Down Expand Up @@ -126,7 +126,6 @@ func GetQueueAttributesV1(req *http.Request) (int, interfaces.AbstractResponseBo
attr := models.Attribute{Name: "RedrivePolicy", Value: fmt.Sprintf(`{"maxReceiveCount":"%d", "deadLetterTargetArn":"%s"}`, queue.MaxReceiveCount, queue.DeadLetterQueue.Arn)}
queueAttributes = append(queueAttributes, attr)
}
app.SyncQueues.RUnlock()

respStruct := models.GetQueueAttributesResponse{
models.BASE_XMLNS,
Expand Down
30 changes: 0 additions & 30 deletions app/gosqs/gosqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,36 +362,6 @@ func DeleteQueue(w http.ResponseWriter, req *http.Request) {
}
}

func PurgeQueue(w http.ResponseWriter, req *http.Request) {
// Sent response type
w.Header().Set("Content-Type", "application/xml")

// Retrieve FormValues required
queueUrl := getQueueFromPath(req.FormValue("QueueUrl"), req.URL.String())

uriSegments := strings.Split(queueUrl, "/")
queueName := uriSegments[len(uriSegments)-1]

log.Println("Purging Queue:", queueName)

app.SyncQueues.Lock()
if _, ok := app.SyncQueues.Queues[queueName]; ok {
app.SyncQueues.Queues[queueName].Messages = nil
app.SyncQueues.Queues[queueName].Duplicates = make(map[string]time.Time)
respStruct := app.PurgeQueueResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.ResponseMetadata{RequestId: "00000000-0000-0000-0000-000000000000"}}
enc := xml.NewEncoder(w)
enc.Indent(" ", " ")
if err := enc.Encode(respStruct); err != nil {
log.Printf("error: %v\n", err)
createErrorResponse(w, req, "GeneralError")
}
} else {
log.Println("Purge Queue:", queueName, ", queue does not exist!!!")
createErrorResponse(w, req, "QueueNotFound")
}
app.SyncQueues.Unlock()
}

func GetQueueUrl(w http.ResponseWriter, req *http.Request) {
// Sent response type
w.Header().Set("Content-Type", "application/xml")
Expand Down
43 changes: 43 additions & 0 deletions app/gosqs/purge_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package gosqs

import (
"net/http"
"strings"
"time"

"github.com/Admiral-Piett/goaws/app/interfaces"
"github.com/Admiral-Piett/goaws/app/models"
"github.com/Admiral-Piett/goaws/app/utils"

"github.com/Admiral-Piett/goaws/app"
log "github.com/sirupsen/logrus"
)

func PurgeQueueV1(req *http.Request) (int, interfaces.AbstractResponseBody) {
requestBody := models.NewPurgeQueueRequest()
ok := utils.REQUEST_TRANSFORMER(requestBody, req, false)
if !ok {
log.Error("Invalid Request - PurgeQueueV1")
return createErrorResponseV1(ErrInvalidParameterValue.Type)
}

uriSegments := strings.Split(requestBody.QueueUrl, "/")
queueName := uriSegments[len(uriSegments)-1]

app.SyncQueues.Lock()
defer app.SyncQueues.Unlock()
if _, ok := app.SyncQueues.Queues[queueName]; !ok {
log.Errorf("Purge Queue: %s, queue does not exist!!!", queueName)
return createErrorResponseV1("QueueNotFound")
}

log.Infof("Purging Queue: %s", queueName)
app.SyncQueues.Queues[queueName].Messages = nil
app.SyncQueues.Queues[queueName].Duplicates = make(map[string]time.Time)

respStruct := models.PurgeQueueResponse{
Xmlns: models.BASE_XMLNS,
Metadata: models.BASE_RESPONSE_METADATA,
}
return http.StatusOK, respStruct
}
125 changes: 125 additions & 0 deletions app/gosqs/purge_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package gosqs

import (
"fmt"
"net/http"
"testing"
"time"

"github.com/Admiral-Piett/goaws/app/conf"

"github.com/Admiral-Piett/goaws/app"
"github.com/Admiral-Piett/goaws/app/fixtures"
"github.com/Admiral-Piett/goaws/app/interfaces"
"github.com/Admiral-Piett/goaws/app/models"
"github.com/Admiral-Piett/goaws/app/utils"
"github.com/stretchr/testify/assert"
)

func TestPurgeQueueV1_success(t *testing.T) {
conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests")
defer func() {
utils.ResetApp()
utils.REQUEST_TRANSFORMER = utils.TransformRequest
}()

utils.REQUEST_TRANSFORMER = func(resultingStruct interfaces.AbstractRequestBody, req *http.Request, emptyRequestValid bool) (success bool) {
v := resultingStruct.(*models.PurgeQueueRequest)
*v = models.PurgeQueueRequest{
QueueUrl: fmt.Sprintf("%s/%s", fixtures.BASE_URL, "unit-queue1"),
}
return true
}

// Put a message on the queue
targetQueue := app.SyncQueues.Queues["unit-queue1"]
app.SyncQueues.Lock()
targetQueue.Messages = []app.Message{app.Message{}}
targetQueue.Duplicates = map[string]time.Time{
"dedupe-id": time.Now(),
}
app.SyncQueues.Unlock()

expectedResponse := models.PurgeQueueResponse{
Xmlns: models.BASE_XMLNS,
Metadata: models.BASE_RESPONSE_METADATA,
}

_, r := utils.GenerateRequestInfo("POST", "/", nil, true)
code, response := PurgeQueueV1(r)

assert.Equal(t, http.StatusOK, code)
assert.Equal(t, expectedResponse, response)

assert.Nil(t, targetQueue.Messages)
assert.Equal(t, map[string]time.Time{}, targetQueue.Duplicates)
}

func TestPurgeQueueV1_success_no_messages_on_queue(t *testing.T) {
conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests")
defer func() {
utils.ResetApp()
utils.REQUEST_TRANSFORMER = utils.TransformRequest
}()

utils.REQUEST_TRANSFORMER = func(resultingStruct interfaces.AbstractRequestBody, req *http.Request, emptyRequestValid bool) (success bool) {
v := resultingStruct.(*models.PurgeQueueRequest)
*v = models.PurgeQueueRequest{
QueueUrl: fmt.Sprintf("%s/%s", fixtures.BASE_URL, "unit-queue1"),
}
return true
}

expectedResponse := models.PurgeQueueResponse{
Xmlns: models.BASE_XMLNS,
Metadata: models.BASE_RESPONSE_METADATA,
}

_, r := utils.GenerateRequestInfo("POST", "/", nil, true)
code, response := PurgeQueueV1(r)

assert.Equal(t, http.StatusOK, code)
assert.Equal(t, expectedResponse, response)

targetQueue := app.SyncQueues.Queues["unit-queue1"]
assert.Nil(t, targetQueue.Messages)
assert.Equal(t, map[string]time.Time{}, targetQueue.Duplicates)
}

func TestPurgeQueueV1_request_transformer_error(t *testing.T) {
conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests")
defer func() {
utils.ResetApp()
utils.REQUEST_TRANSFORMER = utils.TransformRequest
}()

utils.REQUEST_TRANSFORMER = func(resultingStruct interfaces.AbstractRequestBody, req *http.Request, emptyRequestValid bool) (success bool) {
return false
}

_, r := utils.GenerateRequestInfo("POST", "/", nil, true)
code, _ := PurgeQueueV1(r)

assert.Equal(t, http.StatusBadRequest, code)
}

func TestPurgeQueueV1_requested_queue_does_not_exist(t *testing.T) {
conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests")
defer func() {
utils.ResetApp()
utils.REQUEST_TRANSFORMER = utils.TransformRequest
}()

utils.REQUEST_TRANSFORMER = func(resultingStruct interfaces.AbstractRequestBody, req *http.Request, emptyRequestValid bool) (success bool) {
v := resultingStruct.(*models.PurgeQueueRequest)
*v = models.PurgeQueueRequest{
QueueUrl: fmt.Sprintf("%s/%s", fixtures.BASE_URL, "garbage"),
}
return true
}

_, r := utils.GenerateRequestInfo("POST", "/", nil, true)
code, _ := PurgeQueueV1(r)

assert.Equal(t, http.StatusBadRequest, code)
}
2 changes: 1 addition & 1 deletion app/gosqs/send_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func SendMessageV1(req *http.Request) (int, interfaces.AbstractResponseBody) {
requestBody := models.NewSendMessageRequest()
ok := utils.REQUEST_TRANSFORMER(requestBody, req, false)
if !ok {
log.Error("Invalid Request - CreateQueueV1")
log.Error("Invalid Request - SendMessageV1")
return createErrorResponseV1(ErrInvalidParameterValue.Type)
}
messageBody := requestBody.MessageBody
Expand Down
10 changes: 10 additions & 0 deletions app/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,13 @@ type DeleteMessageRequest struct {
}

func (r *DeleteMessageRequest) SetAttributesFromForm(values url.Values) {}

func NewPurgeQueueRequest() *PurgeQueueRequest {
return &PurgeQueueRequest{}
}

type PurgeQueueRequest struct {
QueueUrl string `json:"QueueUrl" schema:"QueueUrl"`
}

func (r *PurgeQueueRequest) SetAttributesFromForm(values url.Values) {}
14 changes: 14 additions & 0 deletions app/models/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,17 @@ func (r SetQueueAttributesResponse) GetResult() interface{} {
func (r SetQueueAttributesResponse) GetRequestId() string {
return r.Metadata.RequestId
}

/*** Purge Queue Response */
type PurgeQueueResponse struct {
Xmlns string `xml:"xmlns,attr,omitempty"`
Metadata app.ResponseMetadata `xml:"ResponseMetadata,omitempty"`
}

func (r PurgeQueueResponse) GetResult() interface{} {
return nil
}

func (r PurgeQueueResponse) GetRequestId() string {
return r.Metadata.RequestId
}
2 changes: 1 addition & 1 deletion app/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ var routingTableV1 = map[string]func(r *http.Request) (int, interfaces.AbstractR
"ReceiveMessage": sqs.ReceiveMessageV1,
"ChangeMessageVisibility": sqs.ChangeMessageVisibilityV1,
"DeleteMessage": sqs.DeleteMessageV1,
"PurgeQueue": sqs.PurgeQueueV1,
}

var routingTable = map[string]http.HandlerFunc{
// SQS
"SendMessageBatch": sqs.SendMessageBatch,
"DeleteMessageBatch": sqs.DeleteMessageBatch,
"GetQueueUrl": sqs.GetQueueUrl,
"PurgeQueue": sqs.PurgeQueue,
"DeleteQueue": sqs.DeleteQueue,

// SNS
Expand Down
2 changes: 1 addition & 1 deletion app/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,13 @@ func TestActionHandler_v0_xml(t *testing.T) {
"ReceiveMessage": sqs.ReceiveMessageV1,
"DeleteMessage": sqs.DeleteMessageV1,
"ChangeMessageVisibility": sqs.ChangeMessageVisibilityV1,
"PurgeQueue": sqs.PurgeQueueV1,
}
routingTable = map[string]http.HandlerFunc{
// SQS
"SendMessageBatch": sqs.SendMessageBatch,
"DeleteMessageBatch": sqs.DeleteMessageBatch,
"GetQueueUrl": sqs.GetQueueUrl,
"PurgeQueue": sqs.PurgeQueue,
"DeleteQueue": sqs.DeleteQueue,

// SNS
Expand Down
6 changes: 0 additions & 6 deletions app/sqs_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,6 @@ type SendMessageBatchResponse struct {
Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"`
}

/*** Purge Queue Response */
type PurgeQueueResponse struct {
Xmlns string `xml:"xmlns,attr,omitempty"`
Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"`
}

/*** Get Queue Url Response */
type GetQueueUrlResult struct {
QueueUrl string `xml:"QueueUrl,omitempty"`
Expand Down
3 changes: 0 additions & 3 deletions smoke_tests/sqs_create_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ import (
"github.com/gavv/httpexpect/v2"
)

// TODO - Is there a way to also capture the defaults we set and/or load from the config here? (review the xml
// code below)
// NOTE: Actually I think you can just adjust the app.CurrentEnvironment memory space...it travels across tests it seems.
func Test_CreateQueueV1_json_no_attributes(t *testing.T) {
server := generateServer()
defer func() {
Expand Down
Loading

0 comments on commit fcdd208

Please sign in to comment.