Skip to content

Commit

Permalink
Add SendMessageBatchV1 for JSON support
Browse files Browse the repository at this point in the history
fixes comment
  • Loading branch information
Dai.Otsuka authored and Admiral-Piett committed Sep 20, 2024
1 parent 2374d82 commit e5f9bdb
Show file tree
Hide file tree
Showing 10 changed files with 1,026 additions and 440 deletions.
136 changes: 0 additions & 136 deletions app/gosqs/gosqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
log "github.com/sirupsen/logrus"

"github.com/Admiral-Piett/goaws/app"
"github.com/Admiral-Piett/goaws/app/common"
"github.com/gorilla/mux"
)

Expand Down Expand Up @@ -68,141 +67,6 @@ func PeriodicTasks(d time.Duration, quit <-chan struct{}) {
}
}

type SendEntry struct {
Id string
MessageBody string
MessageAttributes map[string]app.MessageAttributeValue
MessageGroupId string
MessageDeduplicationId string
}

func SendMessageBatch(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "application/xml")
req.ParseForm()

queueUrl := getQueueFromPath(req.FormValue("QueueUrl"), req.URL.String())
queueName := ""
if queueUrl == "" {
vars := mux.Vars(req)
queueName = vars["queueName"]
} else {
uriSegments := strings.Split(queueUrl, "/")
queueName = uriSegments[len(uriSegments)-1]
}

if _, ok := app.SyncQueues.Queues[queueName]; !ok {
createErrorResponse(w, req, "QueueNotFound")
return
}

sendEntries := []SendEntry{}

for k, v := range req.Form {
keySegments := strings.Split(k, ".")
if keySegments[0] == "SendMessageBatchRequestEntry" {
if len(keySegments) < 3 {
createErrorResponse(w, req, "EmptyBatchRequest")
return
}
keyIndex, err := strconv.Atoi(keySegments[1])
if err != nil {
createErrorResponse(w, req, "Error")
return
}

if len(sendEntries) < keyIndex {
newSendEntries := make([]SendEntry, keyIndex)
copy(newSendEntries, sendEntries)
sendEntries = newSendEntries
}

if keySegments[2] == "Id" {
sendEntries[keyIndex-1].Id = v[0]
}

if keySegments[2] == "MessageBody" {
sendEntries[keyIndex-1].MessageBody = v[0]
}

if keySegments[2] == "MessageGroupId" {
sendEntries[keyIndex-1].MessageGroupId = v[0]
}

if keySegments[2] == "MessageDeduplicationId" {
sendEntries[keyIndex-1].MessageDeduplicationId = v[0]
}
}
}

if len(sendEntries) == 0 {
createErrorResponse(w, req, "EmptyBatchRequest")
return
}

if len(sendEntries) > 10 {
createErrorResponse(w, req, "TooManyEntriesInBatchRequest")
return
}
ids := map[string]struct{}{}
for _, v := range sendEntries {
if _, ok := ids[v.Id]; ok {
createErrorResponse(w, req, "BatchEntryIdsNotDistinct")
return
}
ids[v.Id] = struct{}{}
}

sentEntries := make([]app.SendMessageBatchResultEntry, 0)
log.Println("Putting Message in Queue:", queueName)
for _, sendEntry := range sendEntries {
msg := app.Message{MessageBody: []byte(sendEntry.MessageBody)}
if len(sendEntry.MessageAttributes) > 0 {
msg.MessageAttributes = sendEntry.MessageAttributes
msg.MD5OfMessageAttributes = common.HashAttributes(sendEntry.MessageAttributes)
}
msg.MD5OfMessageBody = common.GetMD5Hash(sendEntry.MessageBody)
msg.GroupID = sendEntry.MessageGroupId
msg.DeduplicationID = sendEntry.MessageDeduplicationId
msg.Uuid, _ = common.NewUUID()
msg.SentTime = time.Now()
app.SyncQueues.Lock()
fifoSeqNumber := ""
if app.SyncQueues.Queues[queueName].IsFIFO {
fifoSeqNumber = app.SyncQueues.Queues[queueName].NextSequenceNumber(sendEntry.MessageGroupId)
}

if !app.SyncQueues.Queues[queueName].IsDuplicate(sendEntry.MessageDeduplicationId) {
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg)
} else {
log.Debugf("Message with deduplicationId [%s] in queue [%s] is duplicate ", sendEntry.MessageDeduplicationId, queueName)
}

app.SyncQueues.Queues[queueName].InitDuplicatation(sendEntry.MessageDeduplicationId)

app.SyncQueues.Unlock()
se := app.SendMessageBatchResultEntry{
Id: sendEntry.Id,
MessageId: msg.Uuid,
MD5OfMessageBody: msg.MD5OfMessageBody,
MD5OfMessageAttributes: msg.MD5OfMessageAttributes,
SequenceNumber: fifoSeqNumber,
}
sentEntries = append(sentEntries, se)
log.Infof("%s: Queue: %s, Message: %s\n", time.Now().Format("2006-01-02 15:04:05"), queueName, msg.MessageBody)
}

respStruct := app.SendMessageBatchResponse{
"http://queue.amazonaws.com/doc/2012-11-05/",
app.SendMessageBatchResult{Entry: sentEntries},
app.ResponseMetadata{RequestId: "00000000-0000-0000-0000-000000000001"}}

enc := xml.NewEncoder(w)
enc.Indent(" ", " ")
if err := enc.Encode(respStruct); err != nil {
log.Printf("error: %v\n", err)
}
}

func numberOfHiddenMessagesInQueue(queue app.Queue) int {
num := 0
for _, m := range queue.Messages {
Expand Down
Loading

0 comments on commit e5f9bdb

Please sign in to comment.