From 16b13231601ad81eb86a065002a08ef7f330e2d6 Mon Sep 17 00:00:00 2001 From: Jeff Rhyason Date: Mon, 14 Jun 2021 10:16:06 -0700 Subject: [PATCH] honour DelaySeconds queue attribute and SendMessage parameter --- app/gosqs/gosqs.go | 20 +-- app/gosqs/gosqs_test.go | 193 +++++++++++++++++++++++++++-- app/gosqs/queue_attributes.go | 4 + app/gosqs/queue_attributes_test.go | 3 +- app/sqs.go | 12 +- 5 files changed, 212 insertions(+), 20 deletions(-) diff --git a/app/gosqs/gosqs.go b/app/gosqs/gosqs.go index 240180af..cdd81a87 100644 --- a/app/gosqs/gosqs.go +++ b/app/gosqs/gosqs.go @@ -158,6 +158,7 @@ func CreateQueue(w http.ResponseWriter, req *http.Request) { func SendMessage(w http.ResponseWriter, req *http.Request) { w.Header().Set("Content-Type", "application/xml") + req.ParseForm() messageBody := req.FormValue("MessageBody") messageGroupID := req.FormValue("MessageGroupId") messageDeduplicationID := req.FormValue("MessageDeduplicationId") @@ -180,13 +181,18 @@ func SendMessage(w http.ResponseWriter, req *http.Request) { return } - if (app.SyncQueues.Queues[queueName].MaximumMessageSize > 0 && - len(messageBody) > app.SyncQueues.Queues[queueName].MaximumMessageSize) { + if app.SyncQueues.Queues[queueName].MaximumMessageSize > 0 && + len(messageBody) > app.SyncQueues.Queues[queueName].MaximumMessageSize { // Message size is too big createErrorResponse(w, req, "MessageTooBig") return } + delaySecs := app.SyncQueues.Queues[queueName].DelaySecs + if mv := req.FormValue("DelaySeconds"); mv != "" { + delaySecs, _ = strconv.Atoi(mv) + } + log.Println("Putting Message in Queue:", queueName) msg := app.Message{MessageBody: []byte(messageBody)} if len(messageAttributes) > 0 { @@ -198,6 +204,7 @@ func SendMessage(w http.ResponseWriter, req *http.Request) { msg.GroupID = messageGroupID msg.DeduplicationID = messageDeduplicationID msg.SentTime = time.Now() + msg.DelaySecs = delaySecs app.SyncQueues.Lock() fifoSeqNumber := "" @@ -272,7 +279,6 @@ func SendMessageBatch(w http.ResponseWriter, req *http.Request) { return } keyIndex, err := strconv.Atoi(keySegments[1]) - if err != nil { createErrorResponse(w, req, "Error") return @@ -377,6 +383,7 @@ func SendMessageBatch(w http.ResponseWriter, req *http.Request) { func ReceiveMessage(w http.ResponseWriter, req *http.Request) { w.Header().Set("Content-Type", "application/xml") + req.ParseForm() waitTimeSeconds := 0 wts := req.FormValue("WaitTimeSeconds") @@ -504,8 +511,8 @@ func ReceiveMessage(w http.ResponseWriter, req *http.Request) { func numberOfHiddenMessagesInQueue(queue app.Queue) int { num := 0 - for i := range queue.Messages { - if queue.Messages[i].ReceiptHandle != "" { + for _, m := range queue.Messages { + if m.ReceiptHandle != "" || m.DelaySecs > 0 && time.Now().Before(m.SentTime.Add(time.Duration(m.DelaySecs)*time.Second)) { num++ } } @@ -612,7 +619,6 @@ func DeleteMessageBatch(w http.ResponseWriter, req *http.Request) { keySegments := strings.Split(k, ".") if keySegments[0] == "DeleteMessageBatchRequestEntry" { keyIndex, err := strconv.Atoi(keySegments[1]) - if err != nil { createErrorResponse(w, req, "Error") return @@ -836,7 +842,7 @@ func GetQueueAttributes(w http.ResponseWriter, req *http.Request) { attribs := make([]app.Attribute, 0, 0) attr := app.Attribute{Name: "VisibilityTimeout", Value: strconv.Itoa(queue.TimeoutSecs)} attribs = append(attribs, attr) - attr = app.Attribute{Name: "DelaySeconds", Value: "0"} + attr = app.Attribute{Name: "DelaySeconds", Value: strconv.Itoa(queue.DelaySecs)} attribs = append(attribs, attr) attr = app.Attribute{Name: "ReceiveMessageWaitTimeSeconds", Value: strconv.Itoa(queue.ReceiveWaitTimeSecs)} attribs = append(attribs, attr) diff --git a/app/gosqs/gosqs_test.go b/app/gosqs/gosqs_test.go index 1181e81e..4581a25c 100644 --- a/app/gosqs/gosqs_test.go +++ b/app/gosqs/gosqs_test.go @@ -91,7 +91,6 @@ func TestListQueues_POST_Success(t *testing.T) { t.Errorf("handler returned unexpected body: got %v", rr.Body.String()) } - } func TestCreateQueuehandler_POST_CreateQueue(t *testing.T) { @@ -133,12 +132,12 @@ func TestCreateQueuehandler_POST_CreateQueue(t *testing.T) { rr.Body.String(), expected) } expectedQueue := &app.Queue{ - Name: queueName, - URL: "http://://" + queueName, - Arn: "arn:aws:sqs:::" + queueName, - TimeoutSecs: 60, + Name: queueName, + URL: "http://://" + queueName, + Arn: "arn:aws:sqs:::" + queueName, + TimeoutSecs: 60, MaximumMessageSize: 2048, - Duplicates: make(map[string]time.Time), + Duplicates: make(map[string]time.Time), } actualQueue := app.SyncQueues.Queues[queueName] if !reflect.DeepEqual(expectedQueue, actualQueue) { @@ -186,7 +185,7 @@ func TestCreateFIFOQueuehandler_POST_CreateQueue(t *testing.T) { Arn: "arn:aws:sqs:::" + queueName, TimeoutSecs: 60, IsFIFO: true, - Duplicates: make(map[string]time.Time), + Duplicates: make(map[string]time.Time), } actualQueue := app.SyncQueues.Queues[queueName] if !reflect.DeepEqual(expectedQueue, actualQueue) { @@ -1018,7 +1017,6 @@ func TestDeadLetterQueue(t *testing.T) { if len(deadLetterQueue.Messages) == 0 { t.Fatal("expected a message") } - } func TestReceiveMessageWaitTimeEnforced(t *testing.T) { @@ -1122,6 +1120,7 @@ func TestReceiveMessageWaitTimeEnforced(t *testing.T) { t.Fatal("handler waited when message was available, expected not to wait") } } + func TestReceiveMessage_CanceledByClient(t *testing.T) { // create a queue req, err := http.NewRequest("POST", "/", nil) @@ -1313,7 +1312,96 @@ func TestReceiveMessage_WithConcurrentDeleteQueue(t *testing.T) { if timedout := waitTimeout(&wg, 2*time.Second); timedout { t.Errorf("concurrent handlers timeout, expecting both to return within timeout") } +} + +func TestReceiveMessageDelaySeconds(t *testing.T) { + // create a queue + req, err := http.NewRequest("POST", "/", nil) + if err != nil { + t.Fatal(err) + } + form := url.Values{} + form.Add("Action", "CreateQueue") + form.Add("QueueName", "delay-seconds-queue") + form.Add("Attribute.1.Name", "DelaySeconds") + form.Add("Attribute.1.Value", "2") + form.Add("Version", "2012-11-05") + req.PostForm = form + + rr := httptest.NewRecorder() + http.HandlerFunc(CreateQueue).ServeHTTP(rr, req) + + if status := rr.Code; status != http.StatusOK { + t.Errorf("handler returned wrong status code: got \n%v want %v", + status, http.StatusOK) + } + + // send a message + req, err = http.NewRequest("POST", "/", nil) + if err != nil { + t.Fatal(err) + } + form = url.Values{} + form.Add("Action", "SendMessage") + form.Add("QueueUrl", "http://localhost:4100/queue/delay-seconds-queue") + form.Add("MessageBody", "1") + form.Add("Version", "2012-11-05") + req.PostForm = form + rr = httptest.NewRecorder() + http.HandlerFunc(SendMessage).ServeHTTP(rr, req) + if status := rr.Code; status != http.StatusOK { + t.Errorf("handler returned wrong status code: got \n%v want %v", + status, http.StatusOK) + } + + // receive message before delay is up + req, err = http.NewRequest("POST", "/", nil) + if err != nil { + t.Fatal(err) + } + form = url.Values{} + form.Add("Action", "ReceiveMessage") + form.Add("QueueUrl", "http://localhost:4100/queue/delay-seconds-queue") + form.Add("Version", "2012-11-05") + req.PostForm = form + rr = httptest.NewRecorder() + http.HandlerFunc(ReceiveMessage).ServeHTTP(rr, req) + if status := rr.Code; status != http.StatusOK { + t.Errorf("handler returned wrong status code: got \n%v want %v", + status, http.StatusOK) + } + if ok := strings.Contains(rr.Body.String(), ""); ok { + t.Fatal("handler should not return a message") + } + // receive message with wait should return after delay + req, err = http.NewRequest("POST", "/", nil) + if err != nil { + t.Fatal(err) + } + form = url.Values{} + form.Add("Action", "ReceiveMessage") + form.Add("QueueUrl", "http://localhost:4100/queue/delay-seconds-queue") + form.Add("WaitTimeSeconds", "10") + form.Add("Version", "2012-11-05") + req.PostForm = form + rr = httptest.NewRecorder() + start := time.Now() + http.HandlerFunc(ReceiveMessage).ServeHTTP(rr, req) + elapsed := time.Since(start) + if status := rr.Code; status != http.StatusOK { + t.Errorf("handler returned wrong status code: got \n%v want %v", + status, http.StatusOK) + } + if elapsed < 1*time.Second { + t.Errorf("handler didn't wait at all") + } + if ok := strings.Contains(rr.Body.String(), ""); !ok { + t.Errorf("handler should return a message") + } + if elapsed > 4*time.Second { + t.Errorf("handler didn't need to wait all WaitTimeSeconds=10, only DelaySeconds=2") + } } func TestSetQueueAttributes_POST_QueueNotFound(t *testing.T) { @@ -1762,6 +1850,95 @@ func TestSendMessage_POST_DuplicatationEnabledOnFifoQueue(t *testing.T) { } } +func TestSendMessage_POST_DelaySeconds(t *testing.T) { + // create a queue + req, err := http.NewRequest("POST", "/", nil) + if err != nil { + t.Fatal(err) + } + form := url.Values{} + form.Add("Action", "CreateQueue") + form.Add("QueueName", "sendmessage-delay") + form.Add("Version", "2012-11-05") + req.PostForm = form + + rr := httptest.NewRecorder() + http.HandlerFunc(CreateQueue).ServeHTTP(rr, req) + + if status := rr.Code; status != http.StatusOK { + t.Errorf("handler returned wrong status code: got \n%v want %v", + status, http.StatusOK) + } + + // send a message + req, err = http.NewRequest("POST", "/", nil) + if err != nil { + t.Fatal(err) + } + form = url.Values{} + form.Add("Action", "SendMessage") + form.Add("QueueUrl", "http://localhost:4100/queue/sendmessage-delay") + form.Add("MessageBody", "1") + form.Add("DelaySeconds", "2") + form.Add("Version", "2012-11-05") + req.PostForm = form + rr = httptest.NewRecorder() + http.HandlerFunc(SendMessage).ServeHTTP(rr, req) + if status := rr.Code; status != http.StatusOK { + t.Errorf("handler returned wrong status code: got \n%v want %v", + status, http.StatusOK) + } + + // receive message before delay is up + req, err = http.NewRequest("POST", "/", nil) + if err != nil { + t.Fatal(err) + } + form = url.Values{} + form.Add("Action", "ReceiveMessage") + form.Add("QueueUrl", "http://localhost:4100/queue/sendmessage-delay") + form.Add("Version", "2012-11-05") + req.PostForm = form + rr = httptest.NewRecorder() + http.HandlerFunc(ReceiveMessage).ServeHTTP(rr, req) + if status := rr.Code; status != http.StatusOK { + t.Errorf("handler returned wrong status code: got \n%v want %v", + status, http.StatusOK) + } + if ok := strings.Contains(rr.Body.String(), ""); ok { + t.Fatal("handler should not return a message") + } + + // receive message with wait should return after delay + req, err = http.NewRequest("POST", "/", nil) + if err != nil { + t.Fatal(err) + } + form = url.Values{} + form.Add("Action", "ReceiveMessage") + form.Add("QueueUrl", "http://localhost:4100/queue/sendmessage-delay") + form.Add("WaitTimeSeconds", "10") + form.Add("Version", "2012-11-05") + req.PostForm = form + rr = httptest.NewRecorder() + start := time.Now() + http.HandlerFunc(ReceiveMessage).ServeHTTP(rr, req) + elapsed := time.Since(start) + if status := rr.Code; status != http.StatusOK { + t.Errorf("handler returned wrong status code: got \n%v want %v", + status, http.StatusOK) + } + if elapsed < 1*time.Second { + t.Errorf("handler didn't wait at all") + } + if ok := strings.Contains(rr.Body.String(), ""); !ok { + t.Errorf("handler should return a message") + } + if elapsed > 4*time.Second { + t.Errorf("handler didn't need to wait all WaitTimeSeconds=10, only DelaySeconds=2") + } +} + // waitTimeout waits for the waitgroup for the specified max timeout. // Returns true if waiting timed out. // credits: https://stackoverflow.com/questions/32840687/timeout-for-waitgroup-wait diff --git a/app/gosqs/queue_attributes.go b/app/gosqs/queue_attributes.go index 35745c9f..a3b65e54 100644 --- a/app/gosqs/queue_attributes.go +++ b/app/gosqs/queue_attributes.go @@ -78,6 +78,10 @@ func validateAndSetQueueAttributes(q *app.Queue, u url.Values) error { q.DeadLetterQueue = deadLetterQueue q.MaxReceiveCount = maxReceiveCount } + delaySecs, _ := strconv.Atoi(attr["DelaySeconds"]) + if delaySecs != 0 { + q.DelaySecs = delaySecs + } return nil } diff --git a/app/gosqs/queue_attributes_test.go b/app/gosqs/queue_attributes_test.go index 7886d01a..f84f5d09 100644 --- a/app/gosqs/queue_attributes_test.go +++ b/app/gosqs/queue_attributes_test.go @@ -17,7 +17,7 @@ func TestApplyQueueAttributes(t *testing.T) { q := &app.Queue{TimeoutSecs: 30} u := url.Values{} u.Add("Attribute.1.Name", "DelaySeconds") - u.Add("Attribute.1.Value", "20") + u.Add("Attribute.1.Value", "25") u.Add("Attribute.2.Name", "VisibilityTimeout") u.Add("Attribute.2.Value", "60") u.Add("Attribute.3.Name", "Policy") @@ -31,6 +31,7 @@ func TestApplyQueueAttributes(t *testing.T) { expected := &app.Queue{ TimeoutSecs: 60, ReceiveWaitTimeSecs: 20, + DelaySecs: 25, MaxReceiveCount: 4, DeadLetterQueue: deadLetterQueue, } diff --git a/app/sqs.go b/app/sqs.go index 4a701cf7..564781bc 100644 --- a/app/sqs.go +++ b/app/sqs.go @@ -3,12 +3,13 @@ package app import ( "errors" "fmt" - log "github.com/sirupsen/logrus" "math/rand" "strconv" "strings" "sync" "time" + + log "github.com/sirupsen/logrus" ) type SqsErrorType struct { @@ -37,7 +38,8 @@ type Message struct { MessageAttributes map[string]MessageAttributeValue GroupID string DeduplicationID string - SentTime time.Time + SentTime time.Time + DelaySecs int } func (m *Message) IsReadyForReceipt() bool { @@ -46,10 +48,11 @@ func (m *Message) IsReadyForReceipt() bool { log.Error(err) return true } - return m.SentTime.Add(randomLatency).Before(time.Now()) + showAt := m.SentTime.Add(randomLatency).Add(time.Duration(m.DelaySecs) * time.Second) + return showAt.Before(time.Now()) } -func getRandomLatency() (time.Duration, error){ +func getRandomLatency() (time.Duration, error) { min := CurrentEnvironment.RandomLatency.Min max := CurrentEnvironment.RandomLatency.Max if min == 0 && max == 0 { @@ -81,6 +84,7 @@ type Queue struct { Arn string TimeoutSecs int ReceiveWaitTimeSecs int + DelaySecs int MaximumMessageSize int Messages []Message DeadLetterQueue *Queue