Skip to content

Commit

Permalink
Migrate SendMessage (+15 squashed commits)
Browse files Browse the repository at this point in the history
Squashed commits:
[7342cdd] Update
[f1de347] Add tests
[28011fb] fix test
[a2d30b4] Add SetAttributesFromForm test
[8464892] Add xml smoke test
[fe667e4] Update UT
[14c8798] Update by reviews
[29dc396] Add a test about message size exceeding
[9083ad0] Add a test
[4efb39a] Fix MessageAttributes propagation
[85b0f0b] separate unit test
[29b45a3] move model into same model files + update a test
[06db0ee] add 1 smoke test
[031e67e] test commit
[f3062f3] Migrate SQS SendMessage w/o smoke test

test commit

add 1 smoke test

move model into same model files + update a test

separate unit test

Fix MessageAttributes propagation

Add a test

Add a test about message size exceeding

Update by reviews

Update UT

Add xml smoke test

Add SetAttributesFromForm test

fix test

Add tests

Update

add TODO comment
  • Loading branch information
kojisaikiAtSony authored and Admiral-Piett committed Sep 20, 2024
1 parent 8ec7d7e commit edaa9b7
Show file tree
Hide file tree
Showing 12 changed files with 890 additions and 292 deletions.
88 changes: 1 addition & 87 deletions app/gosqs/gosqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func init() {
app.SqsErrors["InvalidVisibilityTimeout"] = err8
err9 := app.SqsErrorType{HttpError: http.StatusBadRequest, Type: "MessageNotInFlight", Code: "AWS.SimpleQueueService.MessageNotInFlight", Message: "The message referred to isn't in flight."}
app.SqsErrors["MessageNotInFlight"] = err9
err10 := app.SqsErrorType{HttpError: http.StatusBadRequest, Type: "MessageTooBig", Code: "InvalidMessageContents", Message: "The message size exceeds the limit."}
err10 := app.SqsErrorType{HttpError: http.StatusBadRequest, Type: "MessageTooBig", Code: "InvalidParameterValue", Message: "The message size exceeds the limit."}
app.SqsErrors["MessageTooBig"] = err10
app.SqsErrors[ErrInvalidParameterValue.Type] = *ErrInvalidParameterValue
app.SqsErrors[ErrInvalidAttributeValue.Type] = *ErrInvalidAttributeValue
Expand Down Expand Up @@ -95,92 +95,6 @@ func PeriodicTasks(d time.Duration, quit <-chan struct{}) {
}
}

func SendMessage(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "application/xml")
req.ParseForm()
messageBody := req.FormValue("MessageBody")
messageGroupID := req.FormValue("MessageGroupId")
messageDeduplicationID := req.FormValue("MessageDeduplicationId")
messageAttributes := extractMessageAttributes(req, "")

queueUrl := getQueueFromPath(req.FormValue("QueueUrl"), req.URL.String())

queueName := ""
if queueUrl == "" {
vars := mux.Vars(req)
queueName = vars["queueName"]
} else {
uriSegments := strings.Split(queueUrl, "/")
queueName = uriSegments[len(uriSegments)-1]
}

if _, ok := app.SyncQueues.Queues[queueName]; !ok {
// Queue does not exist
createErrorResponse(w, req, "QueueNotFound")
return
}

if app.SyncQueues.Queues[queueName].MaximumMessageSize > 0 &&
len(messageBody) > app.SyncQueues.Queues[queueName].MaximumMessageSize {
// Message size is too big
createErrorResponse(w, req, "MessageTooBig")
return
}

delaySecs := app.SyncQueues.Queues[queueName].DelaySeconds
if mv := req.FormValue("DelaySeconds"); mv != "" {
delaySecs, _ = strconv.Atoi(mv)
}

log.Println("Putting Message in Queue:", queueName)
msg := app.Message{MessageBody: []byte(messageBody)}
if len(messageAttributes) > 0 {
msg.MessageAttributes = messageAttributes
msg.MD5OfMessageAttributes = common.HashAttributes(messageAttributes)
}
msg.MD5OfMessageBody = common.GetMD5Hash(messageBody)
msg.Uuid, _ = common.NewUUID()
msg.GroupID = messageGroupID
msg.DeduplicationID = messageDeduplicationID
msg.SentTime = time.Now()
msg.DelaySecs = delaySecs

app.SyncQueues.Lock()
fifoSeqNumber := ""
if app.SyncQueues.Queues[queueName].IsFIFO {
fifoSeqNumber = app.SyncQueues.Queues[queueName].NextSequenceNumber(messageGroupID)
}

if !app.SyncQueues.Queues[queueName].IsDuplicate(messageDeduplicationID) {
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg)
} else {
log.Debugf("Message with deduplicationId [%s] in queue [%s] is duplicate ", messageDeduplicationID, queueName)
}

app.SyncQueues.Queues[queueName].InitDuplicatation(messageDeduplicationID)
app.SyncQueues.Unlock()
log.Infof("%s: Queue: %s, Message: %s\n", time.Now().Format("2006-01-02 15:04:05"), queueName, msg.MessageBody)

respStruct := app.SendMessageResponse{
Xmlns: "http://queue.amazonaws.com/doc/2012-11-05/",
Result: app.SendMessageResult{
MD5OfMessageAttributes: msg.MD5OfMessageAttributes,
MD5OfMessageBody: msg.MD5OfMessageBody,
MessageId: msg.Uuid,
SequenceNumber: fifoSeqNumber,
},
Metadata: app.ResponseMetadata{
RequestId: "00000000-0000-0000-0000-000000000000",
},
}

enc := xml.NewEncoder(w)
enc.Indent(" ", " ")
if err := enc.Encode(respStruct); err != nil {
log.Printf("error: %v\n", err)
}
}

type SendEntry struct {
Id string
MessageBody string
Expand Down
Loading

0 comments on commit edaa9b7

Please sign in to comment.