Skip to content

Commit

Permalink
gosqs: added support to cancel sqs receiveMessage(). Without this if …
Browse files Browse the repository at this point in the history
…the client cancels a waiting receiveMessage(), goaws risks to send a subsequent message from queue to a receive() handler that no client listens to and make this message not visible to pickup by other receiver until WaitTimeSeconds is reached
  • Loading branch information
Sebastien cante committed Oct 17, 2019
1 parent 9436ebb commit 9296ed3
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 11 deletions.
11 changes: 9 additions & 2 deletions app/gosqs/gosqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"

log "github.com/sirupsen/logrus"

"github.com/gorilla/mux"
"github.com/p4tin/goaws/app"
"github.com/p4tin/goaws/app/common"
Expand Down Expand Up @@ -382,7 +382,14 @@ func ReceiveMessage(w http.ResponseWriter, req *http.Request) {
messageFound := len(app.SyncQueues.Queues[queueName].Messages)-numberOfHiddenMessagesInQueue(*app.SyncQueues.Queues[queueName]) != 0
app.SyncQueues.RUnlock()
if !messageFound {
time.Sleep(100 * time.Millisecond)
continueTimer := time.NewTimer(100 * time.Millisecond)
select {
case <-req.Context().Done():
continueTimer.Stop()
return // client gave up
case <-continueTimer.C:
continueTimer.Stop()
}
loops--
} else {
break
Expand Down
135 changes: 126 additions & 9 deletions app/gosqs/gosqs_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package gosqs

import (
"context"
"encoding/xml"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -1036,6 +1038,113 @@ func TestReceiveMessageWaitTimeEnforced(t *testing.T) {
t.Fatal("handler waited when message was available, expected not to wait")
}
}
func TestReceiveMessage_CanceledByClient(t *testing.T) {
// create a queue
req, err := http.NewRequest("POST", "/", nil)
if err != nil {
t.Fatal(err)
}
form := url.Values{}
form.Add("Action", "CreateQueue")
form.Add("QueueName", "cancel-queue")
form.Add("Attribute.1.Name", "ReceiveMessageWaitTimeSeconds")
form.Add("Attribute.1.Value", "20")
form.Add("Version", "2012-11-05")
req.PostForm = form

rr := httptest.NewRecorder()
http.HandlerFunc(CreateQueue).ServeHTTP(rr, req)

var wg sync.WaitGroup
ctx, cancelReceive := context.WithCancel(context.Background())

wg.Add(1)
go func() {
defer wg.Done()
// receive message (that will be canceled)
req, err := http.NewRequestWithContext(ctx, "POST", "/", nil)
if err != nil {
t.Fatal(err)
}

form := url.Values{}
form.Add("Action", "ReceiveMessage")
form.Add("QueueUrl", "http://localhost:4100/queue/cancel-queue")
form.Add("Version", "2012-11-05")
req.PostForm = form

rr := httptest.NewRecorder()
fmt.Println(" http.HandlerFunc(ReceiveMessage).ServeHTTP(rr, req)")
http.HandlerFunc(ReceiveMessage).ServeHTTP(rr, req)
fmt.Println(" END http.HandlerFunc(ReceiveMessage).ServeHTTP(rr, req)")

if status := rr.Code; status != http.StatusOK {
t.Errorf("handler returned wrong status code: got \n%v want %v",
status, http.StatusOK)
}

if ok := strings.Contains(rr.Body.String(), "12345"); ok {
t.Fatal("expecting this Receive() to not pickup this message as it should canceled before the Send()")
}
}()
time.Sleep(100 * time.Millisecond) // let enought time for the Receive go to wait mode
cancelReceive() // cancel the first Receive(), make sure it will not pickup the sent message below
time.Sleep(5 * time.Millisecond)

// send a message
req, err = http.NewRequest("POST", "/", nil)
if err != nil {
t.Fatal(err)
}

form = url.Values{}
form.Add("Action", "SendMessage")
form.Add("QueueUrl", "http://localhost:4100/queue/cancel-queue")
form.Add("MessageBody", "12345")
form.Add("Version", "2012-11-05")
req.PostForm = form

rr = httptest.NewRecorder()
http.HandlerFunc(SendMessage).ServeHTTP(rr, req)

if status := rr.Code; status != http.StatusOK {
t.Errorf("handler returned wrong status code: got \n%v want %v",
status, http.StatusOK)
}

// receive message
req, err = http.NewRequest("POST", "/", nil)
if err != nil {
t.Fatal(err)
}

form = url.Values{}
form.Add("Action", "ReceiveMessage")
form.Add("QueueUrl", "http://localhost:4100/queue/cancel-queue")
form.Add("Version", "2012-11-05")
req.PostForm = form

rr = httptest.NewRecorder()

start := time.Now()
http.HandlerFunc(ReceiveMessage).ServeHTTP(rr, req)
elapsed := time.Since(start)

if status := rr.Code; status != http.StatusOK {
t.Errorf("handler returned wrong status code: got \n%v want %v",
status, http.StatusOK)
}
if ok := strings.Contains(rr.Body.String(), "12345"); !ok {
t.Fatal("handler should return a message")
}
if elapsed > 1*time.Second {
t.Fatal("handler waited when message was available, expected not to wait")
}

if timedout := waitTimeout(&wg, 2*time.Second); timedout {
t.Errorf("expected receive() in goroutine to exit quickly due to cancelReceive() called")
}
}

func TestReceiveMessage_WithConcurrentDeleteQueue(t *testing.T) {
// create a queue
Expand Down Expand Up @@ -1118,15 +1227,7 @@ func TestReceiveMessage_WithConcurrentDeleteQueue(t *testing.T) {
}
}()

c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return // completed successfully
case <-time.After(2 * time.Second):
if timedout := waitTimeout(&wg, 2*time.Second); timedout {
t.Errorf("concurrent handlers timeout, expecting both to return within timeout")
}

Expand Down Expand Up @@ -1362,3 +1463,19 @@ func TestSendingAndReceivingFromFIFOQueueReturnsSameMessageOnError(t *testing.T)

done <- struct{}{}
}

// waitTimeout waits for the waitgroup for the specified max timeout.
// Returns true if waiting timed out.
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}

0 comments on commit 9296ed3

Please sign in to comment.