Skip to content

Commit

Permalink
SQS JSON API - ReceiveMessage, ChangeMessageVisibility, DeleteMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
andrew-womeldorf authored and Admiral-Piett committed Sep 20, 2024
1 parent edaa9b7 commit 8f356e7
Show file tree
Hide file tree
Showing 23 changed files with 1,332 additions and 1,052 deletions.
4 changes: 2 additions & 2 deletions app/gosns/gosns.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func Publish(w http.ResponseWriter, req *http.Request) {
//Create the response
msgId, _ := common.NewUUID()
uuid, _ := common.NewUUID()
respStruct := app.PublishResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.PublishResult{MessageId: msgId}, app.ResponseMetadata{RequestId: uuid}}
respStruct := app.PublishResponse{Xmlns: "http://queue.amazonaws.com/doc/2012-11-05/", Result: app.PublishResult{MessageId: msgId}, Metadata: app.ResponseMetadata{RequestId: uuid}}
SendResponseBack(w, req, respStruct, content)
}

Expand Down Expand Up @@ -701,7 +701,7 @@ func getMessageAttributesFromRequest(req *http.Request) map[string]app.MessageAt
for _, valueKey := range [...]string{"StringValue", "BinaryValue"} {
value := req.FormValue(fmt.Sprintf("MessageAttributes.entry.%d.Value.%s", i, valueKey))
if value != "" {
attributes[name] = app.MessageAttributeValue{name, dataType, value, valueKey}
attributes[name] = app.MessageAttributeValue{Name: name, DataType: dataType, Value: value, ValueKey: valueKey}
}
}

Expand Down
82 changes: 82 additions & 0 deletions app/gosqs/change_message_visibility.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package gosqs

import (
"net/http"
"strings"
"time"

"github.com/Admiral-Piett/goaws/app"
"github.com/Admiral-Piett/goaws/app/interfaces"
"github.com/Admiral-Piett/goaws/app/models"
"github.com/Admiral-Piett/goaws/app/utils"
"github.com/gorilla/mux"
log "github.com/sirupsen/logrus"
)

func ChangeMessageVisibilityV1(req *http.Request) (int, interfaces.AbstractResponseBody) {
requestBody := models.NewChangeMessageVisibilityRequest()
ok := utils.REQUEST_TRANSFORMER(requestBody, req)
if !ok {
log.Error("Invalid Request - ChangeMessageVisibilityV1")
return createErrorResponseV1(ErrInvalidParameterValue.Type)
}

vars := mux.Vars(req)

queueUrl := requestBody.QueueUrl
queueName := ""
if queueUrl == "" {
queueName = vars["queueName"]
} else {
uriSegments := strings.Split(queueUrl, "/")
queueName = uriSegments[len(uriSegments)-1]
}

receiptHandle := requestBody.ReceiptHandle

visibilityTimeout := requestBody.VisibilityTimeout
if visibilityTimeout > 43200 {
return createErrorResponseV1("ValidationError")
}

if _, ok := app.SyncQueues.Queues[queueName]; !ok {
return createErrorResponseV1("QueueNotFound")
}

app.SyncQueues.Lock()
messageFound := false
for i := 0; i < len(app.SyncQueues.Queues[queueName].Messages); i++ {
queue := app.SyncQueues.Queues[queueName]
msgs := queue.Messages
if msgs[i].ReceiptHandle == receiptHandle {
timeout := app.SyncQueues.Queues[queueName].VisibilityTimeout
if visibilityTimeout == 0 {
msgs[i].ReceiptTime = time.Now().UTC()
msgs[i].ReceiptHandle = ""
msgs[i].VisibilityTimeout = time.Now().Add(time.Duration(timeout) * time.Second)
msgs[i].Retry++
if queue.MaxReceiveCount > 0 &&
queue.DeadLetterQueue != nil &&
msgs[i].Retry > queue.MaxReceiveCount {
queue.DeadLetterQueue.Messages = append(queue.DeadLetterQueue.Messages, msgs[i])
queue.Messages = append(queue.Messages[:i], queue.Messages[i+1:]...)
i++
}
} else {
msgs[i].VisibilityTimeout = time.Now().Add(time.Duration(visibilityTimeout) * time.Second)
}
messageFound = true
break
}
}
app.SyncQueues.Unlock()
if !messageFound {
return createErrorResponseV1("MessageNotInFlight")
}

respStruct := models.ChangeMessageVisibilityResult{
"http://queue.amazonaws.com/doc/2012-11-05/",
app.ResponseMetadata{RequestId: "00000000-0000-0000-0000-000000000001"}}

return http.StatusOK, &respStruct
}
48 changes: 48 additions & 0 deletions app/gosqs/change_message_visibility_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package gosqs

import (
"net/http"
"testing"

"github.com/Admiral-Piett/goaws/app"
"github.com/Admiral-Piett/goaws/app/fixtures"
"github.com/Admiral-Piett/goaws/app/models"
"github.com/Admiral-Piett/goaws/app/utils"
"github.com/stretchr/testify/assert"
)

func TestChangeMessageVisibility_POST_SUCCESS(t *testing.T) {
// create a queue
app.CurrentEnvironment = fixtures.LOCAL_ENVIRONMENT
defer func() {
utils.ResetApp()
}()

q := &app.Queue{
Name: "testing",
Messages: []app.Message{{
MessageBody: []byte("test1"),
ReceiptHandle: "123",
}},
}
app.SyncQueues.Queues["testing"] = q

// The default value for the VisibilityTimeout is the zero value of time.Time
assert.Zero(t, q.Messages[0].VisibilityTimeout)

_, r := utils.GenerateRequestInfo("POST", "/", models.ChangeMessageVisibilityRequest{
QueueUrl: "http://localhost:4100/queue/testing",
ReceiptHandle: "123",
VisibilityTimeout: 0,
}, true)
status, _ := ChangeMessageVisibilityV1(r)
assert.Equal(t, status, http.StatusOK)

// Changing the message visibility increments the time.Time by N seconds
// from the current time.
//
// Given that the current time is relative between calling the endpoint and
// the time being set, we can't reliably assert an exact value. So assert
// that the time.Time value is no longer the default zero value.
assert.NotZero(t, q.Messages[0].VisibilityTimeout)
}
13 changes: 4 additions & 9 deletions app/gosqs/create_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,13 @@ import (
"testing"
"time"

"github.com/mitchellh/copystructure"

"github.com/Admiral-Piett/goaws/app/models"

"github.com/Admiral-Piett/goaws/app"
"github.com/Admiral-Piett/goaws/app/fixtures"
"github.com/Admiral-Piett/goaws/app/interfaces"

"github.com/Admiral-Piett/goaws/app/models"
"github.com/Admiral-Piett/goaws/app/utils"
"github.com/mitchellh/copystructure"
"github.com/stretchr/testify/assert"

"github.com/Admiral-Piett/goaws/app/fixtures"

"github.com/Admiral-Piett/goaws/app"
)

func TestCreateQueueV1_success(t *testing.T) {
Expand Down
63 changes: 63 additions & 0 deletions app/gosqs/delete_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package gosqs

import (
"net/http"
"strings"

"github.com/Admiral-Piett/goaws/app"
"github.com/Admiral-Piett/goaws/app/interfaces"
"github.com/Admiral-Piett/goaws/app/models"
"github.com/Admiral-Piett/goaws/app/utils"
"github.com/gorilla/mux"
log "github.com/sirupsen/logrus"
)

func DeleteMessageV1(req *http.Request) (int, interfaces.AbstractResponseBody) {
requestBody := models.NewDeleteMessageRequest()
ok := utils.REQUEST_TRANSFORMER(requestBody, req)
if !ok {
log.Error("Invalid Request - DeleteMessageV1")
return createErrorResponseV1(ErrInvalidParameterValue.Type)
}

// Retrieve FormValues required
receiptHandle := requestBody.ReceiptHandle

// Retrieve FormValues required
queueUrl := requestBody.QueueUrl
queueName := ""
if queueUrl == "" {
vars := mux.Vars(req)
queueName = vars["queueName"]
} else {
uriSegments := strings.Split(queueUrl, "/")
queueName = uriSegments[len(uriSegments)-1]
}

log.Info("Deleting Message, Queue:", queueName, ", ReceiptHandle:", receiptHandle)

// Find queue/message with the receipt handle and delete
app.SyncQueues.Lock()
defer app.SyncQueues.Unlock()
if _, ok := app.SyncQueues.Queues[queueName]; ok {
for i, msg := range app.SyncQueues.Queues[queueName].Messages {
if msg.ReceiptHandle == receiptHandle {
// Unlock messages for the group
log.Debugf("FIFO Queue %s unlocking group %s:", queueName, msg.GroupID)
app.SyncQueues.Queues[queueName].UnlockGroup(msg.GroupID)
//Delete message from Q
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages[:i], app.SyncQueues.Queues[queueName].Messages[i+1:]...)
delete(app.SyncQueues.Queues[queueName].Duplicates, msg.DeduplicationID)

// Create, encode/xml and send response
respStruct := models.DeleteMessageResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.ResponseMetadata{RequestId: "00000000-0000-0000-0000-000000000001"}}
return 200, &respStruct
}
}
log.Warning("Receipt Handle not found")
} else {
log.Warning("Queue not found")
}

return createErrorResponseV1("MessageDoesNotExist")
}
38 changes: 38 additions & 0 deletions app/gosqs/delete_message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package gosqs

import (
"net/http"
"testing"

"github.com/Admiral-Piett/goaws/app"
"github.com/Admiral-Piett/goaws/app/fixtures"
"github.com/Admiral-Piett/goaws/app/models"
"github.com/Admiral-Piett/goaws/app/utils"
"github.com/stretchr/testify/assert"
)

func TestDeleteMessage(t *testing.T) {
app.CurrentEnvironment = fixtures.LOCAL_ENVIRONMENT
defer func() {
utils.ResetApp()
}()

q := &app.Queue{
Name: "testing",
Messages: []app.Message{{
MessageBody: []byte("test1"),
ReceiptHandle: "123",
}},
}

app.SyncQueues.Queues["testing"] = q

_, r := utils.GenerateRequestInfo("POST", "/", models.DeleteMessageRequest{
QueueUrl: "http://localhost:4100/queue/testing",
ReceiptHandle: "123",
}, true)
status, _ := DeleteMessageV1(r)

assert.Equal(t, status, http.StatusOK)
assert.Empty(t, q.Messages)
}
Loading

0 comments on commit 8f356e7

Please sign in to comment.