Skip to content

Commit

Permalink
Merge pull request #235 from sebcante/feature/allow-redrive-policy-at…
Browse files Browse the repository at this point in the history
…-cfg

sqs: allow redrive-policy to be set from config yaml
  • Loading branch information
p4tin authored May 15, 2021
2 parents 0bee4ed + 8ed2fac commit ba73e9b
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 5 deletions.
1 change: 1 addition & 0 deletions app/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type EnvTopic struct {
type EnvQueue struct {
Name string
ReceiveMessageWaitTimeSeconds int
RedrivePolicy string
MaximumMessageSize int
}

Expand Down
52 changes: 52 additions & 0 deletions app/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package conf

import (
"encoding/json"
"fmt"
"io/ioutil"
"path/filepath"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -108,6 +110,19 @@ func LoadYamlConfig(filename string, env string) []string {
}
}

// loop one more time to create queue's RedrivePolicy and assign deadletter queues in case dead letter queue is defined first in the config
for _, queue := range envs[env].Queues {
q := app.SyncQueues.Queues[queue.Name]
if queue.RedrivePolicy != "" {
err := setQueueRedrivePolicy(app.SyncQueues.Queues, q, queue.RedrivePolicy)
if err != nil {
log.Errorf("err: %s", err)
return ports
}
}

}

for _, topic := range envs[env].Topics {
topicArn := "arn:aws:sns:" + app.CurrentEnvironment.Region + ":" + app.CurrentEnvironment.AccountID + ":" + topic.Name

Expand Down Expand Up @@ -179,3 +194,40 @@ func createSqsSubscription(configSubscription app.EnvSubsciption, topicArn strin
newSub.SubscriptionArn = subArn
return newSub
}

func setQueueRedrivePolicy(queues map[string]*app.Queue, q *app.Queue, strRedrivePolicy string) error {
// support both int and string maxReceiveCount (Amazon clients use string)
redrivePolicy1 := struct {
MaxReceiveCount int `json:"maxReceiveCount"`
DeadLetterTargetArn string `json:"deadLetterTargetArn"`
}{}
redrivePolicy2 := struct {
MaxReceiveCount string `json:"maxReceiveCount"`
DeadLetterTargetArn string `json:"deadLetterTargetArn"`
}{}
err1 := json.Unmarshal([]byte(strRedrivePolicy), &redrivePolicy1)
err2 := json.Unmarshal([]byte(strRedrivePolicy), &redrivePolicy2)
maxReceiveCount := redrivePolicy1.MaxReceiveCount
deadLetterQueueArn := redrivePolicy1.DeadLetterTargetArn
if err1 != nil && err2 != nil {
return fmt.Errorf("invalid json for queue redrive policy ")
} else if err1 != nil {
maxReceiveCount, _ = strconv.Atoi(redrivePolicy2.MaxReceiveCount)
deadLetterQueueArn = redrivePolicy2.DeadLetterTargetArn
}

if (deadLetterQueueArn != "" && maxReceiveCount == 0) ||
(deadLetterQueueArn == "" && maxReceiveCount != 0) {
return fmt.Errorf("invalid redrive policy values")
}
dlt := strings.Split(deadLetterQueueArn, ":")
deadLetterQueueName := dlt[len(dlt)-1]
deadLetterQueue, ok := queues[deadLetterQueueName]
if !ok {
return fmt.Errorf("deadletter queue not found")
}
q.DeadLetterQueue = deadLetterQueue
q.MaxReceiveCount = maxReceiveCount

return nil
}
24 changes: 19 additions & 5 deletions app/conf/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ func TestConfig_CreateQueuesTopicsAndSubscriptions(t *testing.T) {
}

numQueues := len(envs[env].Queues)
if numQueues != 3 {
if numQueues != 4 {
t.Errorf("Expected three queues to be in the environment but got %d\n", numQueues)
}
numQueues = len(app.SyncQueues.Queues)
if numQueues != 5 {
if numQueues != 6 {
t.Errorf("Expected five queues to be in the sqs topics but got %d\n", numQueues)
}

Expand Down Expand Up @@ -78,9 +78,23 @@ func TestConfig_QueueAttributes(t *testing.T) {
t.Errorf("Expected local-queue1 Queue to be configured with MaximumMessageSize: 1024 but got %d\n", maximumMessageSize)
}

receiveWaitTime = app.SyncQueues.Queues["local-queue2"].ReceiveWaitTimeSecs
if receiveWaitTime != 20 {
t.Errorf("Expected local-queue2 Queue to be configured with ReceiveMessageWaitTimeSeconds: 20 but got %d\n", receiveWaitTime)
if app.SyncQueues.Queues["local-queue1"].DeadLetterQueue != nil {
t.Errorf("Expected local-queue1 Queue to be configured without redrive policy\n")
}
if app.SyncQueues.Queues["local-queue1"].MaxReceiveCount != 0 {
t.Errorf("Expected local-queue1 Queue to be configured without redrive policy and therefore MaxReceiveCount: 0 \n")
}

maxReceiveCount := app.SyncQueues.Queues["local-queue3"].MaxReceiveCount
if maxReceiveCount != 100 {
t.Errorf("Expected local-queue2 Queue to be configured with MaxReceiveCount: 3 from RedrivePolicy but got %d\n", maxReceiveCount)
}
dlq := app.SyncQueues.Queues["local-queue3"].DeadLetterQueue
if dlq == nil {
t.Errorf("Expected local-queue3 to have one dead letter queue to redrive to\n")
}
if dlq.Name != "local-queue3-dlq" {
t.Errorf("Expected local-queue3 to have dead letter queue local-queue3-dlq but got %s\n", dlq.Name)
}
maximumMessageSize = app.SyncQueues.Queues["local-queue2"].MaximumMessageSize
if maximumMessageSize != 128 {
Expand Down
2 changes: 2 additions & 0 deletions app/conf/mock-data/mock-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Local: # Environment name that can be passed on the
ReceiveMessageWaitTimeSeconds: 20 # Queue receive message max wait time
MaximumMessageSize: 128 # Queue maximum message size (bytes)
- Name: local-queue3 # Queue name
RedrivePolicy: '{"maxReceiveCount": 100, "deadLetterTargetArn":"arn:aws:sqs:us-east-1:000000000000:local-queue3-dlq"}'
- Name: local-queue3-dlq # Queue name
Topics: # List of topic to create at startup
- Name: local-topic1 # Topic name - with some Subscriptions
Subscriptions: # List of Subscriptions to create for this topic (queues will be created as required)
Expand Down

0 comments on commit ba73e9b

Please sign in to comment.