Skip to content

Commit

Permalink
Refactor for CreateQueue for V1 JSON support
Browse files Browse the repository at this point in the history
  • Loading branch information
dhumphreys01 authored and Admiral-Piett committed Sep 20, 2024
1 parent 8c19119 commit a86ab6c
Show file tree
Hide file tree
Showing 37 changed files with 2,684 additions and 436 deletions.
4 changes: 4 additions & 0 deletions app/cmd/goaws.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"os"
"time"

"github.com/Admiral-Piett/goaws/app/utils"

"github.com/Admiral-Piett/goaws/app"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -61,6 +63,8 @@ func main() {
quit := make(chan struct{}, 0)
go gosqs.PeriodicTasks(1*time.Second, quit)

utils.InitializeDecoders()

if len(portNumbers) == 1 {
log.Warnf("GoAws listening on: 0.0.0.0:%s", portNumbers[0])
err := http.ListenAndServe("0.0.0.0:"+portNumbers[0], r)
Expand Down
24 changes: 11 additions & 13 deletions app/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ type EnvQueue struct {
RedrivePolicy string
MaximumMessageSize int
VisibilityTimeout int
MessageRetentionPeriod int
}

type EnvQueueAttributes struct {
VisibilityTimeout int
ReceiveMessageWaitTimeSeconds int
MaximumMessageSize int
MessageRetentionPeriod int // seconds
}

type Environment struct {
Expand All @@ -45,25 +47,21 @@ type Environment struct {
RandomLatency RandomLatency
}

var CurrentEnvironment Environment
// CurrentEnvironment should get overwritten when the app starts up and loads the config. For the
// sake of generating "partial" apps piece-meal during test automation we'll slap these placeholder
// values in here so the resource URLs aren't wonky like `http://://new-queue`.
var CurrentEnvironment = Environment{
Host: "host",
Port: "port",
Region: "region",
AccountID: "accountID",
}

/*** Common ***/
type ResponseMetadata struct {
RequestId string `xml:"RequestId"`
}

/*** Error Responses ***/
type ErrorResult struct {
Type string `xml:"Type,omitempty"`
Code string `xml:"Code,omitempty"`
Message string `xml:"Message,omitempty"`
}

type ErrorResponse struct {
Result ErrorResult `xml:"Error"`
RequestId string `xml:"RequestId"`
}

type RandomLatency struct {
Min int
Max int
Expand Down
53 changes: 33 additions & 20 deletions app/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,22 @@ func LoadYamlConfig(filename string, env string) []string {
}
}

if app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout == 0 {
if app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout <= 0 {
app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout = 30
}

if app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize == 0 {
if app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize <= 0 {
app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize = 262144 // 256K
}

if app.CurrentEnvironment.QueueAttributeDefaults.MessageRetentionPeriod <= 0 {
app.CurrentEnvironment.QueueAttributeDefaults.MessageRetentionPeriod = 345600 // 4 days
}

if app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds <= 0 {
app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds = 0
}

if app.CurrentEnvironment.AccountID == "" {
app.CurrentEnvironment.AccountID = "queue"
}
Expand Down Expand Up @@ -113,16 +121,21 @@ func LoadYamlConfig(filename string, env string) []string {
queue.VisibilityTimeout = app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout
}

if queue.MessageRetentionPeriod == 0 {
queue.MessageRetentionPeriod = app.CurrentEnvironment.QueueAttributeDefaults.MessageRetentionPeriod
}

app.SyncQueues.Queues[queue.Name] = &app.Queue{
Name: queue.Name,
TimeoutSecs: queue.VisibilityTimeout,
Arn: queueArn,
URL: queueUrl,
ReceiveWaitTimeSecs: queue.ReceiveMessageWaitTimeSeconds,
MaximumMessageSize: queue.MaximumMessageSize,
IsFIFO: app.HasFIFOQueueName(queue.Name),
EnableDuplicates: app.CurrentEnvironment.EnableDuplicates,
Duplicates: make(map[string]time.Time),
Name: queue.Name,
VisibilityTimeout: queue.VisibilityTimeout,
Arn: queueArn,
URL: queueUrl,
ReceiveMessageWaitTimeSeconds: queue.ReceiveMessageWaitTimeSeconds,
MaximumMessageSize: queue.MaximumMessageSize,
MessageRetentionPeriod: queue.MessageRetentionPeriod,
IsFIFO: app.HasFIFOQueueName(queue.Name),
EnableDuplicates: app.CurrentEnvironment.EnableDuplicates,
Duplicates: make(map[string]time.Time),
}
}

Expand Down Expand Up @@ -192,15 +205,15 @@ func createSqsSubscription(configSubscription app.EnvSubsciption, topicArn strin
}
queueArn := "arn:aws:sqs:" + app.CurrentEnvironment.Region + ":" + app.CurrentEnvironment.AccountID + ":" + configSubscription.QueueName
app.SyncQueues.Queues[configSubscription.QueueName] = &app.Queue{
Name: configSubscription.QueueName,
TimeoutSecs: app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout,
Arn: queueArn,
URL: queueUrl,
ReceiveWaitTimeSecs: app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds,
MaximumMessageSize: app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize,
IsFIFO: app.HasFIFOQueueName(configSubscription.QueueName),
EnableDuplicates: app.CurrentEnvironment.EnableDuplicates,
Duplicates: make(map[string]time.Time),
Name: configSubscription.QueueName,
VisibilityTimeout: app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout,
Arn: queueArn,
URL: queueUrl,
ReceiveMessageWaitTimeSeconds: app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds,
MaximumMessageSize: app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize,
IsFIFO: app.HasFIFOQueueName(configSubscription.QueueName),
EnableDuplicates: app.CurrentEnvironment.EnableDuplicates,
Duplicates: make(map[string]time.Time),
}
}
qArn := app.SyncQueues.Queues[configSubscription.QueueName].Arn
Expand Down
75 changes: 33 additions & 42 deletions app/conf/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,71 +61,62 @@ func TestConfig_CreateQueuesTopicsAndSubscriptions(t *testing.T) {
}

func TestConfig_QueueAttributes(t *testing.T) {
var emptyQueue *app.Queue
env := "Local"
port := LoadYamlConfig("./mock-data/mock-config.yaml", env)
if port[0] != "4100" {
t.Errorf("Expected port number 4100 but got %s\n", port)
}

receiveWaitTime := app.SyncQueues.Queues["local-queue1"].ReceiveWaitTimeSecs
if receiveWaitTime != 10 {
t.Errorf("Expected local-queue1 Queue to be configured with ReceiveMessageWaitTimeSeconds: 10 but got %d\n", receiveWaitTime)
}
timeoutSecs := app.SyncQueues.Queues["local-queue1"].TimeoutSecs
if timeoutSecs != 10 {
t.Errorf("Expected local-queue1 Queue to be configured with VisibilityTimeout: 10 but got %d\n", timeoutSecs)
}
maximumMessageSize := app.SyncQueues.Queues["local-queue1"].MaximumMessageSize
if maximumMessageSize != 1024 {
t.Errorf("Expected local-queue1 Queue to be configured with MaximumMessageSize: 1024 but got %d\n", maximumMessageSize)
}

if app.SyncQueues.Queues["local-queue1"].DeadLetterQueue != nil {
t.Errorf("Expected local-queue1 Queue to be configured without redrive policy\n")
}
if app.SyncQueues.Queues["local-queue1"].MaxReceiveCount != 0 {
t.Errorf("Expected local-queue1 Queue to be configured without redrive policy and therefore MaxReceiveCount: 0 \n")
}
assert.Equal(t, 10, app.SyncQueues.Queues["local-queue1"].ReceiveMessageWaitTimeSeconds)
assert.Equal(t, 10, app.SyncQueues.Queues["local-queue1"].VisibilityTimeout)
assert.Equal(t, 1024, app.SyncQueues.Queues["local-queue1"].MaximumMessageSize)
assert.Equal(t, emptyQueue, app.SyncQueues.Queues["local-queue1"].DeadLetterQueue)
assert.Equal(t, 0, app.SyncQueues.Queues["local-queue1"].MaxReceiveCount)
assert.Equal(t, 445600, app.SyncQueues.Queues["local-queue1"].MessageRetentionPeriod)
assert.Equal(t, 100, app.SyncQueues.Queues["local-queue3"].MaxReceiveCount)

maxReceiveCount := app.SyncQueues.Queues["local-queue3"].MaxReceiveCount
if maxReceiveCount != 100 {
t.Errorf("Expected local-queue2 Queue to be configured with MaxReceiveCount: 3 from RedrivePolicy but got %d\n", maxReceiveCount)
}
dlq := app.SyncQueues.Queues["local-queue3"].DeadLetterQueue
if dlq == nil {
t.Errorf("Expected local-queue3 to have one dead letter queue to redrive to\n")
}
if dlq.Name != "local-queue3-dlq" {
t.Errorf("Expected local-queue3 to have dead letter queue local-queue3-dlq but got %s\n", dlq.Name)
}
maximumMessageSize = app.SyncQueues.Queues["local-queue2"].MaximumMessageSize
if maximumMessageSize != 128 {
t.Errorf("Expected local-queue2 Queue to be configured with MaximumMessageSize: 128 but got %d\n", maximumMessageSize)
}

timeoutSecs = app.SyncQueues.Queues["local-queue2"].TimeoutSecs
if timeoutSecs != 150 {
t.Errorf("Expected local-queue2 Queue to be configured with VisibilityTimeout: 150 but got %d\n", timeoutSecs)
}
assert.Equal(t, "local-queue3-dlq", app.SyncQueues.Queues["local-queue3"].DeadLetterQueue.Name)
assert.Equal(t, 128, app.SyncQueues.Queues["local-queue2"].MaximumMessageSize)
assert.Equal(t, 150, app.SyncQueues.Queues["local-queue2"].VisibilityTimeout)
assert.Equal(t, 245600, app.SyncQueues.Queues["local-queue2"].MessageRetentionPeriod)
}

func TestConfig_NoQueueAttributeDefaults(t *testing.T) {
env := "NoQueueAttributeDefaults"
LoadYamlConfig("./mock-data/mock-config.yaml", env)

receiveWaitTime := app.SyncQueues.Queues["local-queue1"].ReceiveWaitTimeSecs
receiveWaitTime := app.SyncQueues.Queues["local-queue1"].ReceiveMessageWaitTimeSeconds
if receiveWaitTime != 0 {
t.Errorf("Expected local-queue1 Queue to be configured with ReceiveMessageWaitTimeSeconds: 0 but got %d\n", receiveWaitTime)
}
timeoutSecs := app.SyncQueues.Queues["local-queue1"].TimeoutSecs
timeoutSecs := app.SyncQueues.Queues["local-queue1"].VisibilityTimeout
if timeoutSecs != 30 {
t.Errorf("Expected local-queue1 Queue to be configured with VisibilityTimeout: 30 but got %d\n", timeoutSecs)
}

receiveWaitTime = app.SyncQueues.Queues["local-queue2"].ReceiveWaitTimeSecs
receiveWaitTime = app.SyncQueues.Queues["local-queue2"].ReceiveMessageWaitTimeSeconds
if receiveWaitTime != 20 {
t.Errorf("Expected local-queue2 Queue to be configured with ReceiveMessageWaitTimeSeconds: 20 but got %d\n", receiveWaitTime)
}

messageRetentionPeriod := app.SyncQueues.Queues["local-queue1"].MessageRetentionPeriod
if messageRetentionPeriod != 345600 {
t.Errorf("Expected local-queue2 Queue to be configured with VisibilityTimeout: 150 but got %d\n", timeoutSecs)
}
}

func TestConfig_invalid_config_resorts_to_default_queue_attributes(t *testing.T) {
env := "missing"
port := LoadYamlConfig("./mock-data/mock-config.yaml", env)
if port[0] != "4100" {
t.Errorf("Expected port number 4100 but got %s\n", port)
}

assert.Equal(t, 262144, app.CurrentEnvironment.QueueAttributeDefaults.MaximumMessageSize)
assert.Equal(t, 345600, app.CurrentEnvironment.QueueAttributeDefaults.MessageRetentionPeriod)
assert.Equal(t, 0, app.CurrentEnvironment.QueueAttributeDefaults.ReceiveMessageWaitTimeSeconds)
assert.Equal(t, 30, app.CurrentEnvironment.QueueAttributeDefaults.VisibilityTimeout)
}

func TestConfig_LoadYamlConfig_finds_default_config(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions app/conf/mock-data/mock-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ Local: # Environment name that can be passed on the
VisibilityTimeout: 10 # message visibility timeout
ReceiveMessageWaitTimeSeconds: 10 # receive message max wait time
MaximumMessageSize: 1024 # maximum message size (bytes)
MessageRetentionPeriod: 445600 # time period to retain messages (seconds) NOTE: Functionality not implemented
Queues: # List of queues to create at startup
- Name: local-queue1 # Queue name
- Name: local-queue2 # Queue name
ReceiveMessageWaitTimeSeconds: 20 # Queue receive message max wait time
MaximumMessageSize: 128 # Queue maximum message size (bytes)
VisibilityTimeout: 150 # Queue visibility timeout
MessageRetentionPeriod: 245600
- Name: local-queue3 # Queue name
RedrivePolicy: '{"maxReceiveCount": 100, "deadLetterTargetArn":"arn:aws:sqs:us-east-1:100010001000:local-queue3-dlq"}'
- Name: local-queue3-dlq # Queue name
Expand Down
127 changes: 127 additions & 0 deletions app/fixtures/environment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package fixtures

import "github.com/Admiral-Piett/goaws/app"

var ENV_SUBSCRIPTION_QUEUE_4 = app.EnvSubsciption{
Protocol: "",
EndPoint: "",
TopicArn: "",
QueueName: "local-queue4",
Raw: false,
FilterPolicy: "",
}

var ENV_SUBSCRIPTION_QUEUE_5 = app.EnvSubsciption{
Protocol: "",
EndPoint: "",
TopicArn: "",
QueueName: "local-queue5",
Raw: true,
FilterPolicy: "{\"foo\":[\"bar\"]}",
}

var LOCAL_ENV_TOPIC_1 = app.EnvTopic{
Name: "local-topic1",
Subscriptions: []app.EnvSubsciption{
ENV_SUBSCRIPTION_QUEUE_4,
ENV_SUBSCRIPTION_QUEUE_5,
},
}

var LOCAL_ENV_TOPIC_2 = app.EnvTopic{
Name: "local-topic2",
Subscriptions: []app.EnvSubsciption(nil),
}

var LOCAL_ENV_QUEUE_1 = app.EnvQueue{
Name: "local-queue1",
ReceiveMessageWaitTimeSeconds: 0,
RedrivePolicy: "",
MaximumMessageSize: 0,
}

var LOCAL_ENV_QUEUE_2 = app.EnvQueue{
Name: "local-queue2",
ReceiveMessageWaitTimeSeconds: 20,
RedrivePolicy: "",
MaximumMessageSize: 128,
}

var LOCAL_ENV_QUEUE_3 = app.EnvQueue{
Name: "local-queue3",
ReceiveMessageWaitTimeSeconds: 0,
RedrivePolicy: "{\"maxReceiveCount\": 100, \"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:000000000000:local-queue3-dlq\"}",
MaximumMessageSize: 0,
}

var LOCAL_ENV_QUEUE_3_DLQ = app.EnvQueue{
Name: "local-queue3-dlq",
ReceiveMessageWaitTimeSeconds: 0,
RedrivePolicy: "",
MaximumMessageSize: 0,
}

var DEFAULT_ENVIRONMENT = app.Environment{
Host: "localhost",
Port: "4100",
Region: "local",
AccountID: "queue",
QueueAttributeDefaults: app.EnvQueueAttributes{
VisibilityTimeout: 30,
ReceiveMessageWaitTimeSeconds: 0,
MaximumMessageSize: 262144,
},
RandomLatency: app.RandomLatency{
Min: 0,
Max: 0,
},
}

var NO_QUEUES_NO_TOPICS_ENVIRONEMENT = app.Environment{
Host: "localhost",
Port: "4100",
Region: "eu-west-1",
LogFile: "./goaws_messages.log",
AccountID: "queue",
QueueAttributeDefaults: app.EnvQueueAttributes{
VisibilityTimeout: 30,
ReceiveMessageWaitTimeSeconds: 0,
MaximumMessageSize: 262144,
},
RandomLatency: app.RandomLatency{
Min: 0,
Max: 0,
},
}

var LOCAL_ENVIRONMENT = app.Environment{
Host: "localhost",
Port: "4200",
SqsPort: "",
SnsPort: "",
Region: "us-east-1",
AccountID: "100010001000",
LogToFile: false,
LogFile: "./goaws_messages.log",
EnableDuplicates: false,
Topics: []app.EnvTopic{
LOCAL_ENV_TOPIC_1,
LOCAL_ENV_TOPIC_2,
},
Queues: []app.EnvQueue{
LOCAL_ENV_QUEUE_1,
LOCAL_ENV_QUEUE_2,
LOCAL_ENV_QUEUE_3,
LOCAL_ENV_QUEUE_3_DLQ,
},
QueueAttributeDefaults: app.EnvQueueAttributes{
VisibilityTimeout: 10,
ReceiveMessageWaitTimeSeconds: 11,
MaximumMessageSize: 1024,
MessageRetentionPeriod: 1000,
},
RandomLatency: app.RandomLatency{
Min: 0,
Max: 0,
},
}
Loading

0 comments on commit a86ab6c

Please sign in to comment.