diff --git a/app/common.go b/app/common.go index 72bad1ac2..e94c8f508 100644 --- a/app/common.go +++ b/app/common.go @@ -1,5 +1,34 @@ package app +/*** config ***/ +type EnvSubsciption struct { + QueueName string + Raw bool +} + +type EnvTopic struct { + Name string + Subscriptions []EnvSubsciption +} + +type EnvQueue struct { + Name string +} + +type Environment struct { + Host string + Port string + SqsPort string + SnsPort string + Region string + LogMessages bool + LogFile string + Topics []EnvTopic + Queues []EnvQueue +} + +var CurrentEnvironment Environment + /*** Common ***/ type ResponseMetadata struct { RequestId string `xml:"RequestId"` diff --git a/app/conf/config.go b/app/conf/config.go index 89201a2d9..1ebf6a1b9 100644 --- a/app/conf/config.go +++ b/app/conf/config.go @@ -7,44 +7,17 @@ import ( log "github.com/sirupsen/logrus" "github.com/ghodss/yaml" + "github.com/p4tin/goaws/app" "github.com/p4tin/goaws/app/common" - sns "github.com/p4tin/goaws/app/gosns" - sqs "github.com/p4tin/goaws/app/gosqs" ) -type EnvSubsciption struct { - QueueName string - Raw bool -} - -type EnvTopic struct { - Name string - Subscriptions []EnvSubsciption -} - -type EnvQueue struct { - Name string -} - -type Environment struct { - Host string - Port string - SqsPort string - SnsPort string - Region string - LogMessages bool - LogFile string - Topics []EnvTopic - Queues []EnvQueue -} - -var envs map[string]Environment +var envs map[string]app.Environment func LoadYamlConfig(filename string, env string) []string { ports := []string{"4100"} if filename == "" { - filename, _ = filepath.Abs("./conf/goaws.yaml") + filename, _ = filepath.Abs("./app/conf/goaws.yaml") } log.Warnf("Loading config file: %s", filename) yamlFile, err := ioutil.ReadFile(filename) @@ -70,8 +43,11 @@ func LoadYamlConfig(filename string, env string) []string { ports = []string{envs[env].Port} } else if envs[env].SqsPort != "" && envs[env].SnsPort != "" { ports = []string{envs[env].SqsPort, envs[env].SnsPort} + app.CurrentEnvironment.Port = envs[env].SqsPort } + app.CurrentEnvironment = envs[env] + common.LogMessages = false common.LogFile = "./goaws_messages.log" @@ -82,41 +58,39 @@ func LoadYamlConfig(filename string, env string) []string { } } - sqs.SyncQueues.Lock() - sns.SyncTopics.Lock() + app.SyncQueues.Lock() + app.SyncTopics.Lock() for _, queue := range envs[env].Queues { - queueUrl := "http://" + envs[env].Host + ":" + ports[0] + "/queue/" + queue.Name - sqs.SyncQueues.Queues[queue.Name] = &sqs.Queue{Name: queue.Name, TimeoutSecs: 30, Arn: queueUrl, URL: queueUrl} + queueUrl := "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + "/queue/" + queue.Name + queueArn := "arn:aws:sqs:" + app.CurrentEnvironment.Host + ":000000000000:" + queue.Name + app.SyncQueues.Queues[queue.Name] = &app.Queue{Name: queue.Name, TimeoutSecs: 30, Arn: queueArn, URL: queueUrl} } for _, topic := range envs[env].Topics { topicArn := "arn:aws:sns:" + region + ":000000000000:" + topic.Name - newTopic := &sns.Topic{Name: topic.Name, Arn: topicArn} - newTopic.Subscriptions = make([]*sns.Subscription, 0, 0) + newTopic := &app.Topic{Name: topic.Name, Arn: topicArn} + newTopic.Subscriptions = make([]*app.Subscription, 0, 0) for _, subs := range topic.Subscriptions { - if _, ok := sqs.SyncQueues.Queues[subs.QueueName]; !ok { + if _, ok := app.SyncQueues.Queues[subs.QueueName]; !ok { //Queue does not exist yet, create it. - queueUrl := "http://" + envs[env].Host + ":" + ports[0] + "/queue/" + subs.QueueName - sqs.SyncQueues.Queues[subs.QueueName] = &sqs.Queue{Name: subs.QueueName, TimeoutSecs: 30, Arn: queueUrl, URL: queueUrl} + queueUrl := "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + "/queue/" + subs.QueueName + queueArn := "arn:aws:sqs:" + app.CurrentEnvironment.Host + ":000000000000:" + subs.QueueName + app.SyncQueues.Queues[subs.QueueName] = &app.Queue{Name: subs.QueueName, TimeoutSecs: 30, Arn: queueArn, URL: queueUrl} } - qUrl := sqs.SyncQueues.Queues[subs.QueueName].URL - newSub := &sns.Subscription{EndPoint: qUrl, Protocol: "sqs", TopicArn: topicArn, Raw: subs.Raw} + qUrl := app.SyncQueues.Queues[subs.QueueName].URL + newSub := &app.Subscription{EndPoint: qUrl, Protocol: "sqs", TopicArn: topicArn, Raw: subs.Raw} subArn, _ := common.NewUUID() subArn = topicArn + ":" + subArn newSub.SubscriptionArn = subArn newTopic.Subscriptions = append(newTopic.Subscriptions, newSub) } - sns.SyncTopics.Topics[topic.Name] = newTopic + app.SyncTopics.Topics[topic.Name] = newTopic } - sqs.SyncQueues.Unlock() - sns.SyncTopics.Unlock() + app.SyncQueues.Unlock() + app.SyncTopics.Unlock() return ports } - -func GetLogFileName(env string) (string, bool) { - return envs[env].LogFile, envs[env].LogMessages -} diff --git a/app/conf/config_test.go b/app/conf/config_test.go index 475840a00..e616a6d77 100644 --- a/app/conf/config_test.go +++ b/app/conf/config_test.go @@ -1,9 +1,9 @@ package conf import ( - "github.com/p4tin/goaws/app/gosns" - "github.com/p4tin/goaws/app/gosqs" "testing" + + "github.com/p4tin/goaws/app" ) func TestConfig_NoQueuesOrTopics(t *testing.T) { @@ -17,7 +17,7 @@ func TestConfig_NoQueuesOrTopics(t *testing.T) { if numQueues != 0 { t.Errorf("Expected zero queues to be in the environment but got %s\n", numQueues) } - numQueues = len(gosqs.SyncQueues.Queues) + numQueues = len(app.SyncQueues.Queues) if numQueues != 0 { t.Errorf("Expected zero queues to be in the sqs topics but got %s\n", numQueues) } @@ -26,7 +26,7 @@ func TestConfig_NoQueuesOrTopics(t *testing.T) { if numTopics != 0 { t.Errorf("Expected zero topics to be in the environment but got %s\n", numTopics) } - numTopics = len(gosns.SyncTopics.Topics) + numTopics = len(app.SyncTopics.Topics) if numTopics != 0 { t.Errorf("Expected zero topics to be in the sns topics but got %s\n", numTopics) } @@ -43,7 +43,7 @@ func TestConfig_CreateQueuesTopicsAndSubscriptions(t *testing.T) { if numQueues != 3 { t.Errorf("Expected three queues to be in the environment but got %s\n", numQueues) } - numQueues = len(gosqs.SyncQueues.Queues) + numQueues = len(app.SyncQueues.Queues) if numQueues != 5 { t.Errorf("Expected five queues to be in the sqs topics but got %s\n", numQueues) } @@ -52,7 +52,7 @@ func TestConfig_CreateQueuesTopicsAndSubscriptions(t *testing.T) { if numTopics != 2 { t.Errorf("Expected two topics to be in the environment but got %s\n", numTopics) } - numTopics = len(gosns.SyncTopics.Topics) + numTopics = len(app.SyncTopics.Topics) if numTopics != 2 { t.Errorf("Expected two topics to be in the sns topics but got %s\n", numTopics) } diff --git a/app/conf/goaws.yaml b/app/conf/goaws.yaml index fb8c823fc..1b1109424 100644 --- a/app/conf/goaws.yaml +++ b/app/conf/goaws.yaml @@ -1,6 +1,6 @@ Local: # Environment name that can be passed on the command line # (i.e.: ./goaws [Local | Dev] -- defaults to 'Local') - Host: localhost # hostname of the goaws system (for docker-compose this is the tag name of the container) + Host: goaws # hostname of the goaws system (for docker-compose this is the tag name of the container) # you can now use either 1 port for both sns and sqs or alternatively you can comment out Port and use SqsPort + SnsPort for compatibilyt with # yopa and (fage-sns + face-sqs). If both ways are in the config file on the one "Port" will be used by GoAws Port: 4100 # port to listen on. diff --git a/app/gosns/gosns.go b/app/gosns/gosns.go index 7c68f5d41..0337414a5 100644 --- a/app/gosns/gosns.go +++ b/app/gosns/gosns.go @@ -7,73 +7,24 @@ import ( "fmt" "net/http" "strings" - "sync" "time" log "github.com/sirupsen/logrus" "github.com/p4tin/goaws/app" "github.com/p4tin/goaws/app/common" - sqs "github.com/p4tin/goaws/app/gosqs" ) -type SnsErrorType struct { - HttpError int - Type string - Code string - Message string -} - -var SnsErrors map[string]SnsErrorType - -type Subscription struct { - TopicArn string - Protocol string - SubscriptionArn string - EndPoint string - Raw bool -} - -type Topic struct { - Name string - Arn string - Subscriptions []*Subscription -} - -type ( - Protocol string - MessageStructure string -) - -const ( - ProtocolSQS Protocol = "sqs" - ProtocolDefault Protocol = "default" -) - -const ( - MessageStructureJSON MessageStructure = "json" -) - -// Predefined errors -const ( - ErrNoDefaultElementInJSON = "Invalid parameter: Message Structure - No default entry in JSON message body" -) - -var SyncTopics = struct { - sync.RWMutex - Topics map[string]*Topic -}{Topics: make(map[string]*Topic)} - func init() { - SyncTopics.Topics = make(map[string]*Topic) - - SnsErrors = make(map[string]SnsErrorType) - err1 := SnsErrorType{HttpError: http.StatusBadRequest, Type: "Not Found", Code: "AWS.SimpleNotificationService.NonExistentTopic", Message: "The specified topic does not exist for this wsdl version."} - SnsErrors["TopicNotFound"] = err1 - err2 := SnsErrorType{HttpError: http.StatusBadRequest, Type: "Not Found", Code: "AWS.SimpleNotificationService.NonExistentSubscription", Message: "The specified subscription does not exist for this wsdl version."} - SnsErrors["SubscriptionNotFound"] = err2 - err3 := SnsErrorType{HttpError: http.StatusBadRequest, Type: "Duplicate", Code: "AWS.SimpleNotificationService.TopicAlreadyExists", Message: "The specified topic already exists."} - SnsErrors["TopicExists"] = err3 + app.SyncTopics.Topics = make(map[string]*app.Topic) + + app.SnsErrors = make(map[string]app.SnsErrorType) + err1 := app.SnsErrorType{HttpError: http.StatusBadRequest, Type: "Not Found", Code: "AWS.SimpleNotificationService.NonExistentTopic", Message: "The specified topic does not exist for this wsdl version."} + app.SnsErrors["TopicNotFound"] = err1 + err2 := app.SnsErrorType{HttpError: http.StatusBadRequest, Type: "Not Found", Code: "AWS.SimpleNotificationService.NonExistentSubscription", Message: "The specified subscription does not exist for this wsdl version."} + app.SnsErrors["SubscriptionNotFound"] = err2 + err3 := app.SnsErrorType{HttpError: http.StatusBadRequest, Type: "Duplicate", Code: "AWS.SimpleNotificationService.TopicAlreadyExists", Message: "The specified topic already exists."} + app.SnsErrors["TopicExists"] = err3 } func ListTopics(w http.ResponseWriter, req *http.Request) { @@ -86,7 +37,7 @@ func ListTopics(w http.ResponseWriter, req *http.Request) { respStruct.Result.Topics.Member = make([]app.TopicArnResult, 0, 0) log.Println("Listing Topics") - for _, topic := range SyncTopics.Topics { + for _, topic := range app.SyncTopics.Topics { ta := app.TopicArnResult{TopicArn: topic.Arn} respStruct.Result.Topics.Member = append(respStruct.Result.Topics.Member, ta) } @@ -98,17 +49,17 @@ func CreateTopic(w http.ResponseWriter, req *http.Request) { content := req.FormValue("ContentType") topicName := req.FormValue("Name") topicArn := "" - if _, ok := SyncTopics.Topics[topicName]; ok { - topicArn = SyncTopics.Topics[topicName].Arn + if _, ok := app.SyncTopics.Topics[topicName]; ok { + topicArn = app.SyncTopics.Topics[topicName].Arn } else { topicArn = "arn:aws:sns:local:000000000000:" + topicName log.Println("Creating Topic:", topicName) - topic := &Topic{Name: topicName, Arn: topicArn} - topic.Subscriptions = make([]*Subscription, 0, 0) - SyncTopics.Lock() - SyncTopics.Topics[topicName] = topic - SyncTopics.Unlock() + topic := &app.Topic{Name: topicName, Arn: topicArn} + topic.Subscriptions = make([]*app.Subscription, 0, 0) + app.SyncTopics.Lock() + app.SyncTopics.Topics[topicName] = topic + app.SyncTopics.Unlock() } uuid, _ := common.NewUUID() respStruct := app.CreateTopicResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.CreateTopicResult{TopicArn: topicArn}, app.ResponseMetadata{RequestId: uuid}} @@ -126,25 +77,25 @@ func Subscribe(w http.ResponseWriter, req *http.Request) { topicName := uriSegments[len(uriSegments)-1] log.Println("Creating Subscription from", topicName, "to", endpoint, "using protocol", protocol) - subscription := &Subscription{EndPoint: endpoint, Protocol: protocol, TopicArn: topicArn, Raw: false} + subscription := &app.Subscription{EndPoint: endpoint, Protocol: protocol, TopicArn: topicArn, Raw: false} subArn, _ := common.NewUUID() subArn = topicArn + ":" + subArn subscription.SubscriptionArn = subArn - if SyncTopics.Topics[topicName] != nil { - SyncTopics.Lock() + if app.SyncTopics.Topics[topicName] != nil { + app.SyncTopics.Lock() isDuplicate := false // Duplicate check - for _, subscription := range SyncTopics.Topics[topicName].Subscriptions { + for _, subscription := range app.SyncTopics.Topics[topicName].Subscriptions { if subscription.EndPoint == endpoint && subscription.TopicArn == topicArn { isDuplicate = true subArn = subscription.SubscriptionArn } } if !isDuplicate { - SyncTopics.Topics[topicName].Subscriptions = append(SyncTopics.Topics[topicName].Subscriptions, subscription) + app.SyncTopics.Topics[topicName].Subscriptions = append(app.SyncTopics.Topics[topicName].Subscriptions, subscription) } - SyncTopics.Unlock() + app.SyncTopics.Unlock() //Create the response uuid, _ := common.NewUUID() @@ -164,7 +115,7 @@ func ListSubscriptions(w http.ResponseWriter, req *http.Request) { respStruct.Metadata.RequestId = uuid respStruct.Result.Subscriptions.Member = make([]app.TopicMemberResult, 0, 0) - for _, topic := range SyncTopics.Topics { + for _, topic := range app.SyncTopics.Topics { for _, sub := range topic.Subscriptions { tar := app.TopicMemberResult{TopicArn: topic.Arn, Protocol: sub.Protocol, SubscriptionArn: sub.SubscriptionArn, Endpoint: sub.EndPoint} @@ -182,7 +133,7 @@ func ListSubscriptionsByTopic(w http.ResponseWriter, req *http.Request) { uriSegments := strings.Split(topicArn, ":") topicName := uriSegments[len(uriSegments)-1] - if topic, ok := SyncTopics.Topics[topicName]; ok { + if topic, ok := app.SyncTopics.Topics[topicName]; ok { uuid, _ := common.NewUUID() respStruct := app.ListSubscriptionsByTopicResponse{} respStruct.Xmlns = "http://queue.amazonaws.com/doc/2012-11-05/" @@ -206,17 +157,17 @@ func SetSubscriptionAttributes(w http.ResponseWriter, req *http.Request) { Attribute := req.FormValue("AttributeName") Value := req.FormValue("AttributeValue") - for _, topic := range SyncTopics.Topics { + for _, topic := range app.SyncTopics.Topics { for _, sub := range topic.Subscriptions { if sub.SubscriptionArn == subsArn { if Attribute == "RawMessageDelivery" { - SyncTopics.Lock() + app.SyncTopics.Lock() if Value == "true" { sub.Raw = true } else { sub.Raw = false } - SyncTopics.Unlock() + app.SyncTopics.Unlock() //Good Response == return uuid, _ := common.NewUUID() respStruct := app.SetSubscriptionAttributesResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.ResponseMetadata{RequestId: uuid}} @@ -234,16 +185,16 @@ func Unsubscribe(w http.ResponseWriter, req *http.Request) { subArn := req.FormValue("SubscriptionArn") log.Println("Unsubcribing:", subArn) - for _, topic := range SyncTopics.Topics { + for _, topic := range app.SyncTopics.Topics { for i, sub := range topic.Subscriptions { if sub.SubscriptionArn == subArn { - SyncTopics.Lock() + app.SyncTopics.Lock() copy(topic.Subscriptions[i:], topic.Subscriptions[i+1:]) topic.Subscriptions[len(topic.Subscriptions)-1] = nil topic.Subscriptions = topic.Subscriptions[:len(topic.Subscriptions)-1] - SyncTopics.Unlock() + app.SyncTopics.Unlock() uuid, _ := common.NewUUID() respStruct := app.UnsubscribeResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.ResponseMetadata{RequestId: uuid}} @@ -264,11 +215,11 @@ func DeleteTopic(w http.ResponseWriter, req *http.Request) { log.Println("Delete Topic - TopicName:", topicName) - _, ok := SyncTopics.Topics[topicName] + _, ok := app.SyncTopics.Topics[topicName] if ok { - SyncTopics.Lock() - delete(SyncTopics.Topics, topicName) - SyncTopics.Unlock() + app.SyncTopics.Lock() + delete(app.SyncTopics.Topics, topicName) + app.SyncTopics.Unlock() uuid, _ := common.NewUUID() respStruct := app.DeleteTopicResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.ResponseMetadata{RequestId: uuid}} SendResponseBack(w, req, respStruct, content) @@ -289,21 +240,21 @@ func Publish(w http.ResponseWriter, req *http.Request) { uriSegments := strings.Split(topicArn, ":") topicName := uriSegments[len(uriSegments)-1] - _, ok := SyncTopics.Topics[topicName] + _, ok := app.SyncTopics.Topics[topicName] if ok { log.Println("Publish to Topic:", topicName) - for _, subs := range SyncTopics.Topics[topicName].Subscriptions { - if Protocol(subs.Protocol) == ProtocolSQS { + for _, subs := range app.SyncTopics.Topics[topicName].Subscriptions { + if app.Protocol(subs.Protocol) == app.ProtocolSQS { queueUrl := subs.EndPoint uriSegments := strings.Split(queueUrl, "/") queueName := uriSegments[len(uriSegments)-1] - if _, ok := sqs.SyncQueues.Queues[queueName]; ok { + if _, ok := app.SyncQueues.Queues[queueName]; ok { parts := strings.Split(queueName, ":") if len(parts) > 0 { queueName = parts[len(parts)-1] } - msg := sqs.Message{} + msg := app.Message{} if subs.Raw == false { m, err := CreateMessageBody(messageBody, subject, topicArn, subs.Protocol, messageStructure) if err != nil { @@ -318,9 +269,9 @@ func Publish(w http.ResponseWriter, req *http.Request) { msg.MD5OfMessageAttributes = common.GetMD5Hash("GoAws") msg.MD5OfMessageBody = common.GetMD5Hash(messageBody) msg.Uuid, _ = common.NewUUID() - sqs.SyncQueues.Lock() - sqs.SyncQueues.Queues[queueName].Messages = append(sqs.SyncQueues.Queues[queueName].Messages, msg) - sqs.SyncQueues.Unlock() + app.SyncQueues.Lock() + app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg) + app.SyncQueues.Unlock() common.LogMessage(fmt.Sprintf("%s: Topic: %s(%s), Message: %s\n", time.Now().Format("2006-01-02 15:04:05"), topicName, queueName, msg.MessageBody)) } else { common.LogMessage(fmt.Sprintf("%s: Queue %s does not exits, message discarded\n", time.Now().Format("2006-01-02 15:04:05"), queueName)) @@ -355,7 +306,7 @@ func CreateMessageBody(msg string, subject string, topicArn string, protocol str message.Type = "Notification" message.Subject = subject - if MessageStructure(messageStructure) == MessageStructureJSON { + if app.MessageStructure(messageStructure) == app.MessageStructureJSON { m, err := extractMessageFromJSON(msg, protocol) if err != nil { return nil, err @@ -380,9 +331,9 @@ func extractMessageFromJSON(msg string, protocol string) (string, error) { return "", err } - defaultMsg, ok := msgWithProtocols[string(ProtocolDefault)] + defaultMsg, ok := msgWithProtocols[string(app.ProtocolDefault)] if !ok { - return "", errors.New(ErrNoDefaultElementInJSON) + return "", errors.New(app.ErrNoDefaultElementInJSON) } if m, ok := msgWithProtocols[protocol]; ok { @@ -393,7 +344,7 @@ func extractMessageFromJSON(msg string, protocol string) (string, error) { } func createErrorResponse(w http.ResponseWriter, req *http.Request, err string) { - er := SnsErrors[err] + er := app.SnsErrors[err] respStruct := app.ErrorResponse{app.ErrorResult{Type: er.Type, Code: er.Code, Message: er.Message, RequestId: "00000000-0000-0000-0000-000000000000"}} w.WriteHeader(er.HttpError) diff --git a/app/gosqs/gosqs.go b/app/gosqs/gosqs.go index 1639659d7..000f1105e 100644 --- a/app/gosqs/gosqs.go +++ b/app/gosqs/gosqs.go @@ -7,7 +7,6 @@ import ( "net/url" "strconv" "strings" - "sync" "time" log "github.com/sirupsen/logrus" @@ -17,51 +16,18 @@ import ( "github.com/p4tin/goaws/app/common" ) -type SqsErrorType struct { - HttpError int - Type string - Code string - Message string -} - -var SqsErrors map[string]SqsErrorType - -type Message struct { - MessageBody []byte - Uuid string - MD5OfMessageAttributes string - MD5OfMessageBody string - ReceiptHandle string - ReceiptTime time.Time -} - -type Queue struct { - Name string - URL string - Arn string - TimeoutSecs int - Messages []Message -} - -var SyncQueues = struct { - sync.RWMutex - Queues map[string]*Queue -}{Queues: make(map[string]*Queue)} - -//var Queues map[string]*Queue - func init() { - SyncQueues.Queues = make(map[string]*Queue) - - SqsErrors = make(map[string]SqsErrorType) - err1 := SqsErrorType{HttpError: http.StatusBadRequest, Type: "Not Found", Code: "AWS.SimpleQueueService.NonExistentQueue", Message: "The specified queue does not exist for this wsdl version."} - SqsErrors["QueueNotFound"] = err1 - err2 := SqsErrorType{HttpError: http.StatusBadRequest, Type: "Duplicate", Code: "AWS.SimpleQueueService.QueueExists", Message: "The specified queue already exists."} - SqsErrors["QueueExists"] = err2 - err3 := SqsErrorType{HttpError: http.StatusNotFound, Type: "Not Found", Code: "AWS.SimpleQueueService.QueueExists", Message: "The specified queue does not contain the message specified."} - SqsErrors["MessageDoesNotExist"] = err3 - err4 := SqsErrorType{HttpError: http.StatusBadRequest, Type: "GeneralError", Code: "AWS.SimpleQueueService.GeneralError", Message: "General Error."} - SqsErrors["GeneralError"] = err4 + app.SyncQueues.Queues = make(map[string]*app.Queue) + + app.SqsErrors = make(map[string]app.SqsErrorType) + err1 := app.SqsErrorType{HttpError: http.StatusBadRequest, Type: "Not Found", Code: "AWS.SimpleQueueService.NonExistentQueue", Message: "The specified queue does not exist for this wsdl version."} + app.SqsErrors["QueueNotFound"] = err1 + err2 := app.SqsErrorType{HttpError: http.StatusBadRequest, Type: "Duplicate", Code: "AWS.SimpleQueueService.QueueExists", Message: "The specified queue already exists."} + app.SqsErrors["QueueExists"] = err2 + err3 := app.SqsErrorType{HttpError: http.StatusNotFound, Type: "Not Found", Code: "AWS.SimpleQueueService.QueueExists", Message: "The specified queue does not contain the message specified."} + app.SqsErrors["MessageDoesNotExist"] = err3 + err4 := app.SqsErrorType{HttpError: http.StatusBadRequest, Type: "GeneralError", Code: "AWS.SimpleQueueService.GeneralError", Message: "General Error."} + app.SqsErrors["GeneralError"] = err4 } func ListQueues(w http.ResponseWriter, req *http.Request) { @@ -72,10 +38,10 @@ func ListQueues(w http.ResponseWriter, req *http.Request) { respStruct.Result.QueueUrl = make([]string, 0) log.Println("Listing Queues") - for _, queue := range SyncQueues.Queues { - SyncQueues.Lock() + for _, queue := range app.SyncQueues.Queues { + app.SyncQueues.Lock() respStruct.Result.QueueUrl = append(respStruct.Result.QueueUrl, queue.URL) - SyncQueues.Unlock() + app.SyncQueues.Unlock() } enc := xml.NewEncoder(w) enc.Indent(" ", " ") @@ -87,14 +53,16 @@ func ListQueues(w http.ResponseWriter, req *http.Request) { func CreateQueue(w http.ResponseWriter, req *http.Request) { w.Header().Set("Content-Type", "application/xml") queueName := req.FormValue("QueueName") - queueUrl := "http://" + req.Host + "/queue/" + queueName + host := app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + queueUrl := "http://" + host + "/queue/" + queueName + queueArn := "arn:aws:sqs:" + app.CurrentEnvironment.Host + ":000000000000:" + queueName - if _, ok := SyncQueues.Queues[queueName]; !ok { + if _, ok := app.SyncQueues.Queues[queueName]; !ok { log.Println("Creating Queue:", queueName) - queue := &Queue{Name: queueName, URL: queueUrl, Arn: queueUrl, TimeoutSecs: 30} - SyncQueues.Lock() - SyncQueues.Queues[queueName] = queue - SyncQueues.Unlock() + queue := &app.Queue{Name: queueName, URL: queueUrl, Arn: queueArn, TimeoutSecs: 30} + app.SyncQueues.Lock() + app.SyncQueues.Queues[queueName] = queue + app.SyncQueues.Unlock() } respStruct := app.CreateQueueResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.CreateQueueResult{QueueUrl: queueUrl}, app.ResponseMetadata{RequestId: "00000000-0000-0000-0000-000000000000"}} @@ -121,20 +89,20 @@ func SendMessage(w http.ResponseWriter, req *http.Request) { queueName = uriSegments[len(uriSegments)-1] } - if _, ok := SyncQueues.Queues[queueName]; !ok { + if _, ok := app.SyncQueues.Queues[queueName]; !ok { // Queue does not exists createErrorResponse(w, req, "QueueNotFound") return } log.Println("Putting Message in Queue:", queueName) - msg := Message{MessageBody: []byte(messageBody)} + msg := app.Message{MessageBody: []byte(messageBody)} msg.MD5OfMessageAttributes = hashAttributes(messageAttributes) msg.MD5OfMessageBody = common.GetMD5Hash(messageBody) msg.Uuid, _ = common.NewUUID() - SyncQueues.Lock() - SyncQueues.Queues[queueName].Messages = append(SyncQueues.Queues[queueName].Messages, msg) - SyncQueues.Unlock() + app.SyncQueues.Lock() + app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg) + app.SyncQueues.Unlock() common.LogMessage(fmt.Sprintf("%s: Queue: %s, Message: %s\n", time.Now().Format("2006-01-02 15:04:05"), queueName, msg.MessageBody)) respStruct := app.SendMessageResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.SendMessageResult{MD5OfMessageAttributes: msg.MD5OfMessageAttributes, MD5OfMessageBody: msg.MD5OfMessageBody, MessageId: msg.Uuid}, app.ResponseMetadata{RequestId: "00000000-0000-0000-0000-000000000000"}} @@ -170,7 +138,7 @@ func ReceiveMessage(w http.ResponseWriter, req *http.Request) { queueName = uriSegments[len(uriSegments)-1] } - if _, ok := SyncQueues.Queues[queueName]; !ok { + if _, ok := app.SyncQueues.Queues[queueName]; !ok { createErrorResponse(w, req, "QueueNotFound") return } @@ -180,33 +148,33 @@ func ReceiveMessage(w http.ResponseWriter, req *http.Request) { respStruct := app.ReceiveMessageResponse{} loops := waitTimeSeconds * 10 - for len(SyncQueues.Queues[queueName].Messages)-numberOfHiddenMessagesInQueue(*SyncQueues.Queues[queueName]) == 0 && loops > 0 { + for len(app.SyncQueues.Queues[queueName].Messages)-numberOfHiddenMessagesInQueue(*app.SyncQueues.Queues[queueName]) == 0 && loops > 0 { time.Sleep(100 * time.Millisecond) loops-- } log.Println("Getting Message from Queue:", queueName) - if len(SyncQueues.Queues[queueName].Messages) > 0 { + if len(app.SyncQueues.Queues[queueName].Messages) > 0 { numMsg := 0 message = make([]*app.ResultMessage, 0) - for i := range SyncQueues.Queues[queueName].Messages { + for i := range app.SyncQueues.Queues[queueName].Messages { if numMsg >= maxNumberOfMessages { break } - timeout := time.Now().Add(time.Duration(-SyncQueues.Queues[queueName].TimeoutSecs) * time.Second) - if (SyncQueues.Queues[queueName].Messages[i].ReceiptHandle != "") && (timeout.Before(SyncQueues.Queues[queueName].Messages[i].ReceiptTime)) { + timeout := time.Now().Add(time.Duration(-app.SyncQueues.Queues[queueName].TimeoutSecs) * time.Second) + if (app.SyncQueues.Queues[queueName].Messages[i].ReceiptHandle != "") && (timeout.Before(app.SyncQueues.Queues[queueName].Messages[i].ReceiptTime)) { continue } else { - SyncQueues.Lock() // Lock the Queues + app.SyncQueues.Lock() // Lock the Queues uuid, _ := common.NewUUID() - SyncQueues.Queues[queueName].Messages[i].ReceiptHandle = SyncQueues.Queues[queueName].Messages[i].Uuid + "#" + uuid - SyncQueues.Queues[queueName].Messages[i].ReceiptTime = time.Now() + app.SyncQueues.Queues[queueName].Messages[i].ReceiptHandle = app.SyncQueues.Queues[queueName].Messages[i].Uuid + "#" + uuid + app.SyncQueues.Queues[queueName].Messages[i].ReceiptTime = time.Now() message = append(message, &app.ResultMessage{}) - message[numMsg].MessageId = SyncQueues.Queues[queueName].Messages[i].Uuid - message[numMsg].Body = SyncQueues.Queues[queueName].Messages[i].MessageBody - message[numMsg].ReceiptHandle = SyncQueues.Queues[queueName].Messages[i].ReceiptHandle + message[numMsg].MessageId = app.SyncQueues.Queues[queueName].Messages[i].Uuid + message[numMsg].Body = app.SyncQueues.Queues[queueName].Messages[i].MessageBody + message[numMsg].ReceiptHandle = app.SyncQueues.Queues[queueName].Messages[i].ReceiptHandle message[numMsg].MD5OfBody = common.GetMD5Hash(string(message[numMsg].Body)) - SyncQueues.Unlock() // Unlock the Queues + app.SyncQueues.Unlock() // Unlock the Queues numMsg++ } } @@ -224,7 +192,7 @@ func ReceiveMessage(w http.ResponseWriter, req *http.Request) { } } -func numberOfHiddenMessagesInQueue(queue Queue) int { +func numberOfHiddenMessagesInQueue(queue app.Queue) int { num := 0 for i := range queue.Messages { timeout := time.Now().Add(time.Duration(-queue.TimeoutSecs) * time.Second) @@ -236,10 +204,10 @@ func numberOfHiddenMessagesInQueue(queue Queue) int { } type DeleteEntry struct { - Id string + Id string ReceiptHandle string - Error string - Deleted bool + Error string + Deleted bool } func DeleteMessageBatch(w http.ResponseWriter, req *http.Request) { @@ -286,12 +254,12 @@ func DeleteMessageBatch(w http.ResponseWriter, req *http.Request) { deletedEntries := make([]app.DeleteMessageBatchResultEntry, 0) - SyncQueues.Lock() - if _, ok := SyncQueues.Queues[queueName]; ok { - for i, msg := range SyncQueues.Queues[queueName].Messages { + app.SyncQueues.Lock() + if _, ok := app.SyncQueues.Queues[queueName]; ok { + for i, msg := range app.SyncQueues.Queues[queueName].Messages { for _, deleteEntry := range deleteEntries { if msg.ReceiptHandle == deleteEntry.ReceiptHandle { - SyncQueues.Queues[queueName].Messages = append(SyncQueues.Queues[queueName].Messages[:i], SyncQueues.Queues[queueName].Messages[i+1:]...) + app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages[:i], app.SyncQueues.Queues[queueName].Messages[i+1:]...) deleteEntry.Deleted = true deletedEntry := app.DeleteMessageBatchResultEntry{Id: deleteEntry.Id} @@ -300,15 +268,15 @@ func DeleteMessageBatch(w http.ResponseWriter, req *http.Request) { } } } - SyncQueues.Unlock() + app.SyncQueues.Unlock() notFoundEntries := make([]app.BatchResultErrorEntry, 0) for _, deleteEntry := range deleteEntries { if deleteEntry.Deleted == false { notFoundEntries = append(notFoundEntries, app.BatchResultErrorEntry{ - Code: "1", - Id: deleteEntry.Id, - Message: "Message not found", + Code: "1", + Id: deleteEntry.Id, + Message: "Message not found", SenderFault: true}) } } @@ -346,14 +314,14 @@ func DeleteMessage(w http.ResponseWriter, req *http.Request) { log.Println("Deleting Message, Queue:", queueName, ", ReceiptHandle:", receiptHandle) // Find queue/message with the receipt handle and delete - SyncQueues.Lock() - if _, ok := SyncQueues.Queues[queueName]; ok { - for i, msg := range SyncQueues.Queues[queueName].Messages { + app.SyncQueues.Lock() + if _, ok := app.SyncQueues.Queues[queueName]; ok { + for i, msg := range app.SyncQueues.Queues[queueName].Messages { if msg.ReceiptHandle == receiptHandle { //Delete message from Q - SyncQueues.Queues[queueName].Messages = append(SyncQueues.Queues[queueName].Messages[:i], SyncQueues.Queues[queueName].Messages[i+1:]...) + app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages[:i], app.SyncQueues.Queues[queueName].Messages[i+1:]...) - SyncQueues.Unlock() + app.SyncQueues.Unlock() // Create, encode/xml and send response respStruct := app.DeleteMessageResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.ResponseMetadata{RequestId: "00000000-0000-0000-0000-000000000001"}} enc := xml.NewEncoder(w) @@ -368,7 +336,7 @@ func DeleteMessage(w http.ResponseWriter, req *http.Request) { } else { log.Println("Queue not found") } - SyncQueues.Unlock() + app.SyncQueues.Unlock() createErrorResponse(w, req, "MessageDoesNotExist") } @@ -389,9 +357,9 @@ func DeleteQueue(w http.ResponseWriter, req *http.Request) { } log.Println("Deleting Queue:", queueName) - SyncQueues.Lock() - delete(SyncQueues.Queues, queueName) - SyncQueues.Unlock() + app.SyncQueues.Lock() + delete(app.SyncQueues.Queues, queueName) + app.SyncQueues.Unlock() // Create, encode/xml and send response respStruct := app.DeleteMessageResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.ResponseMetadata{RequestId: "00000000-0000-0000-0000-000000000000"}} @@ -414,9 +382,9 @@ func PurgeQueue(w http.ResponseWriter, req *http.Request) { log.Println("Purging Queue:", queueName) - SyncQueues.Lock() - if _, ok := SyncQueues.Queues[queueName]; ok { - SyncQueues.Queues[queueName].Messages = nil + app.SyncQueues.Lock() + if _, ok := app.SyncQueues.Queues[queueName]; ok { + app.SyncQueues.Queues[queueName].Messages = nil respStruct := app.PurgeQueueResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.ResponseMetadata{RequestId: "00000000-0000-0000-0000-000000000000"}} enc := xml.NewEncoder(w) enc.Indent(" ", " ") @@ -428,7 +396,7 @@ func PurgeQueue(w http.ResponseWriter, req *http.Request) { log.Println("Purge Queue:", queueName, ", queue does not exist!!!") createErrorResponse(w, req, "QueueNotFound") } - SyncQueues.Unlock() + app.SyncQueues.Unlock() } func GetQueueUrl(w http.ResponseWriter, req *http.Request) { @@ -437,7 +405,7 @@ func GetQueueUrl(w http.ResponseWriter, req *http.Request) { // //// Retrieve FormValues required queueName := req.FormValue("QueueName") - if queue, ok := SyncQueues.Queues[queueName]; ok { + if queue, ok := app.SyncQueues.Queues[queueName]; ok { url := queue.URL log.Println("Get Queue URL:", queueName) // Create, encode/xml and send response @@ -470,7 +438,7 @@ func GetQueueAttributes(w http.ResponseWriter, req *http.Request) { } log.Println("Get Queue Attributes:", queueName) - if queue, ok := SyncQueues.Queues[queueName]; ok { + if queue, ok := app.SyncQueues.Queues[queueName]; ok { // Create, encode/xml and send response attribs := make([]app.Attribute, 0, 0) attr := app.Attribute{Name: "VisibilityTimeout", Value: strconv.Itoa(queue.TimeoutSecs)} @@ -526,7 +494,7 @@ func getQueueFromPath(formVal string, theUrl string) string { } func createErrorResponse(w http.ResponseWriter, req *http.Request, err string) { - er := SqsErrors[err] + er := app.SqsErrors[err] respStruct := app.ErrorResponse{app.ErrorResult{Type: er.Type, Code: er.Code, Message: er.Message, RequestId: "00000000-0000-0000-0000-000000000000"}} w.WriteHeader(er.HttpError) diff --git a/app/sns.go b/app/sns.go index 77b463567..593547ba9 100644 --- a/app/sns.go +++ b/app/sns.go @@ -1,108 +1,50 @@ package app -/*** List Topics Response */ -type TopicArnResult struct { - TopicArn string `xml:"TopicArn"` -} - -type TopicNamestype struct { - Member []TopicArnResult `xml:"member"` -} - -type ListTopicsResult struct { - Topics TopicNamestype `xml:"Topics"` -} - -type ListTopicsResponse struct { - Xmlns string `xml:"xmlns,attr"` - Result ListTopicsResult `xml:"ListTopicsResult"` - Metadata ResponseMetadata `xml:"ResponseMetadata"` -} - -/*** Create Topic Response */ -type CreateTopicResult struct { - TopicArn string `xml:"TopicArn"` -} - -type CreateTopicResponse struct { - Xmlns string `xml:"xmlns,attr"` - Result CreateTopicResult `xml:"CreateTopicResult"` - Metadata ResponseMetadata `xml:"ResponseMetadata"` -} - -/*** Create Subscription ***/ -type SubscribeResult struct { - SubscriptionArn string `xml:"SubscriptionArn"` -} - -type SubscribeResponse struct { - Xmlns string `xml:"xmlns,attr"` - Result SubscribeResult `xml:"SubscribeResult"` - Metadata ResponseMetadata `xml:"ResponseMetadata"` -} - -/*** Set Subscription Response ***/ - -type SetSubscriptionAttributesResponse struct { - Xmlns string `xml:"xmlns,attr"` - Metadata ResponseMetadata `xml:"ResponseMetadata"` -} - -/*** List Subscriptions Response */ -type TopicMemberResult struct { - TopicArn string `xml:"TopicArn"` - Protocol string `xml:"Protocol"` - SubscriptionArn string `xml:"SubscriptionArn"` - Owner string `xml:"Owner"` - Endpoint string `xml:"Endpoint"` -} - -type TopicSubscriptions struct { - Member []TopicMemberResult `xml:"member"` -} - -type ListSubscriptionsResult struct { - Subscriptions TopicSubscriptions `xml:"Subscriptions"` -} +import "sync" -type ListSubscriptionsResponse struct { - Xmlns string `xml:"xmlns,attr"` - Result ListSubscriptionsResult `xml:"ListSubscriptionsResult"` - Metadata ResponseMetadata `xml:"ResponseMetadata"` +type SnsErrorType struct { + HttpError int + Type string + Code string + Message string } -/*** List Subscriptions By Topic Response */ +var SnsErrors map[string]SnsErrorType -type ListSubscriptionsByTopicResult struct { - Subscriptions TopicSubscriptions `xml:"Subscriptions"` +type Subscription struct { + TopicArn string + Protocol string + SubscriptionArn string + EndPoint string + Raw bool } -type ListSubscriptionsByTopicResponse struct { - Xmlns string `xml:"xmlns,attr"` - Result ListSubscriptionsResult `xml:"ListSubscriptionsResult"` - Metadata ResponseMetadata `xml:"ResponseMetadata"` +type Topic struct { + Name string + Arn string + Subscriptions []*Subscription } -/*** Publish ***/ +type ( + Protocol string + MessageStructure string +) -type PublishResult struct { - MessageId string `xml:"MessageId"` -} +const ( + ProtocolSQS Protocol = "sqs" + ProtocolDefault Protocol = "default" +) -type PublishResponse struct { - Xmlns string `xml:"xmlns,attr"` - Result PublishResult `xml:"PublishResult"` - Metadata ResponseMetadata `xml:"ResponseMetadata"` -} +const ( + MessageStructureJSON MessageStructure = "json" +) -/*** Unsubscribe ***/ -type UnsubscribeResponse struct { - Xmlns string `xml:"xmlns,attr"` - Metadata ResponseMetadata `xml:"ResponseMetadata"` -} +// Predefined errors +const ( + ErrNoDefaultElementInJSON = "Invalid parameter: Message Structure - No default entry in JSON message body" +) -/*** Delete Topic ***/ -type DeleteTopicResponse struct { - Xmlns string `xml:"xmlns,attr"` - Metadata ResponseMetadata `xml:"ResponseMetadata"` -} +var SyncTopics = struct { + sync.RWMutex + Topics map[string]*Topic +}{Topics: make(map[string]*Topic)} diff --git a/app/sns_messages.go b/app/sns_messages.go new file mode 100644 index 000000000..77b463567 --- /dev/null +++ b/app/sns_messages.go @@ -0,0 +1,108 @@ +package app + +/*** List Topics Response */ +type TopicArnResult struct { + TopicArn string `xml:"TopicArn"` +} + +type TopicNamestype struct { + Member []TopicArnResult `xml:"member"` +} + +type ListTopicsResult struct { + Topics TopicNamestype `xml:"Topics"` +} + +type ListTopicsResponse struct { + Xmlns string `xml:"xmlns,attr"` + Result ListTopicsResult `xml:"ListTopicsResult"` + Metadata ResponseMetadata `xml:"ResponseMetadata"` +} + +/*** Create Topic Response */ +type CreateTopicResult struct { + TopicArn string `xml:"TopicArn"` +} + +type CreateTopicResponse struct { + Xmlns string `xml:"xmlns,attr"` + Result CreateTopicResult `xml:"CreateTopicResult"` + Metadata ResponseMetadata `xml:"ResponseMetadata"` +} + +/*** Create Subscription ***/ +type SubscribeResult struct { + SubscriptionArn string `xml:"SubscriptionArn"` +} + +type SubscribeResponse struct { + Xmlns string `xml:"xmlns,attr"` + Result SubscribeResult `xml:"SubscribeResult"` + Metadata ResponseMetadata `xml:"ResponseMetadata"` +} + +/*** Set Subscription Response ***/ + +type SetSubscriptionAttributesResponse struct { + Xmlns string `xml:"xmlns,attr"` + Metadata ResponseMetadata `xml:"ResponseMetadata"` +} + +/*** List Subscriptions Response */ +type TopicMemberResult struct { + TopicArn string `xml:"TopicArn"` + Protocol string `xml:"Protocol"` + SubscriptionArn string `xml:"SubscriptionArn"` + Owner string `xml:"Owner"` + Endpoint string `xml:"Endpoint"` +} + +type TopicSubscriptions struct { + Member []TopicMemberResult `xml:"member"` +} + +type ListSubscriptionsResult struct { + Subscriptions TopicSubscriptions `xml:"Subscriptions"` +} + +type ListSubscriptionsResponse struct { + Xmlns string `xml:"xmlns,attr"` + Result ListSubscriptionsResult `xml:"ListSubscriptionsResult"` + Metadata ResponseMetadata `xml:"ResponseMetadata"` +} + +/*** List Subscriptions By Topic Response */ + +type ListSubscriptionsByTopicResult struct { + Subscriptions TopicSubscriptions `xml:"Subscriptions"` +} + +type ListSubscriptionsByTopicResponse struct { + Xmlns string `xml:"xmlns,attr"` + Result ListSubscriptionsResult `xml:"ListSubscriptionsResult"` + Metadata ResponseMetadata `xml:"ResponseMetadata"` +} + +/*** Publish ***/ + +type PublishResult struct { + MessageId string `xml:"MessageId"` +} + +type PublishResponse struct { + Xmlns string `xml:"xmlns,attr"` + Result PublishResult `xml:"PublishResult"` + Metadata ResponseMetadata `xml:"ResponseMetadata"` +} + +/*** Unsubscribe ***/ +type UnsubscribeResponse struct { + Xmlns string `xml:"xmlns,attr"` + Metadata ResponseMetadata `xml:"ResponseMetadata"` +} + +/*** Delete Topic ***/ +type DeleteTopicResponse struct { + Xmlns string `xml:"xmlns,attr"` + Metadata ResponseMetadata `xml:"ResponseMetadata"` +} diff --git a/app/sqs.go b/app/sqs.go index 889777527..079286523 100644 --- a/app/sqs.go +++ b/app/sqs.go @@ -1,126 +1,37 @@ package app -/*** List Queues Response */ -type ListQueuesResult struct { - QueueUrl []string `xml:"QueueUrl"` -} - -type ListQueuesResponse struct { - Xmlns string `xml:"xmlns,attr"` - Result ListQueuesResult `xml:"ListQueuesResult"` - Metadata ResponseMetadata `xml:"ResponseMetadata"` -} - -/*** Create Queue Response */ -type CreateQueueResult struct { - QueueUrl string `xml:"QueueUrl"` -} - -type CreateQueueResponse struct { - Xmlns string `xml:"xmlns,attr"` - Result CreateQueueResult `xml:"CreateQueueResult"` - Metadata ResponseMetadata `xml:"ResponseMetadata"` -} - -/*** Send Message Response */ - -type SendMessageResult struct { - MD5OfMessageAttributes string `xml:"MD5OfMessageAttributes"` - MD5OfMessageBody string `xml:"MD5OfMessageBody"` - MessageId string `xml:"MessageId"` -} +import ( + "sync" + "time" +) -type SendMessageResponse struct { - Xmlns string `xml:"xmlns,attr"` - Result SendMessageResult `xml:"SendMessageResult"` - Metadata ResponseMetadata `xml:"ResponseMetadata"` +type SqsErrorType struct { + HttpError int + Type string + Code string + Message string } -/*** Receive Message Response */ - -type ResultMessage struct { - MessageId string `xml:"MessageId,omitempty"` - ReceiptHandle string `xml:"ReceiptHandle,omitempty"` - MD5OfBody string `xml:"MD5OfBody,omitempty"` - Body []byte `xml:"Body,omitempty"` - MD5OfMessageAttributes string `xml:"MD5OfMessageAttributes,omitempty"` -} - -type ReceiveMessageResult struct { - Message []*ResultMessage `xml:"Message,omitempty"` -} - -type ReceiveMessageResponse struct { - Xmlns string `xml:"xmlns,attr"` - Result ReceiveMessageResult `xml:"ReceiveMessageResult"` - Metadata ResponseMetadata `xml:"ResponseMetadata"` -} - -/*** Delete Message Response */ -type DeleteMessageResponse struct { - Xmlns string `xml:"xmlns,attr,omitempty"` - Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"` -} +var SqsErrors map[string]SqsErrorType -type DeleteMessageBatchResultEntry struct { - Id string `xml:"Id"` +type Message struct { + MessageBody []byte + Uuid string + MD5OfMessageAttributes string + MD5OfMessageBody string + ReceiptHandle string + ReceiptTime time.Time } -type BatchResultErrorEntry struct { - Code string `xml:"Code"` - Id string `xml:"Id"` - Message string `xml:"Message,omitempty"` - SenderFault bool `xml:"SenderFault"` +type Queue struct { + Name string + URL string + Arn string + TimeoutSecs int + Messages []Message } -type DeleteMessageBatchResult struct { - Entry []DeleteMessageBatchResultEntry `xml:"DeleteMessageBatchResultEntry"` - Error []BatchResultErrorEntry `xml:"BatchResultErrorEntry,omitempty"` -} - -/*** Delete Message Batch Response */ -type DeleteMessageBatchResponse struct { - Xmlns string `xml:"xmlns,attr,omitempty"` - Result DeleteMessageBatchResult `xml:"DeleteMessageBatchResult"` - Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"` -} - -/*** Purge Queue Response */ -type PurgeQueueResponse struct { - Xmlns string `xml:"xmlns,attr,omitempty"` - Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"` -} - -/*** Get Queue Url Response */ -type GetQueueUrlResult struct { - QueueUrl string `xml:"QueueUrl,omitempty"` -} - -type GetQueueUrlResponse struct { - Xmlns string `xml:"xmlns,attr,omitempty"` - Result GetQueueUrlResult `xml:"GetQueueUrlResult"` - Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"` -} - -/*** Get Queue Attributes ***/ -type Attribute struct { - Name string `xml:"Name,omitempty"` - Value string `xml:"Value,omitempty"` -} - -type GetQueueAttributesResult struct { - /* VisibilityTimeout, DelaySeconds, ReceiveMessageWaitTimeSeconds, ApproximateNumberOfMessages - ApproximateNumberOfMessagesNotVisible, CreatedTimestamp, LastModifiedTimestamp, QueueArn */ - Attrs []Attribute `xml:"Attribute,omitempty"` -} - -type GetQueueAttributesResponse struct { - Xmlns string `xml:"xmlns,attr,omitempty"` - Result GetQueueAttributesResult `xml:"GetQueueAttributesResult"` - Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"` -} - -type SetQueueAttributesResponse struct { - Xmlns string `xml:"xmlns,attr,omitempty"` - Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"` -} +var SyncQueues = struct { + sync.RWMutex + Queues map[string]*Queue +}{Queues: make(map[string]*Queue)} diff --git a/app/sqs_messages.go b/app/sqs_messages.go new file mode 100644 index 000000000..bc257e6dc --- /dev/null +++ b/app/sqs_messages.go @@ -0,0 +1,126 @@ +package app + +/*** List Queues Response */ +type ListQueuesResult struct { + QueueUrl []string `xml:"QueueUrl"` +} + +type ListQueuesResponse struct { + Xmlns string `xml:"xmlns,attr"` + Result ListQueuesResult `xml:"ListQueuesResult"` + Metadata ResponseMetadata `xml:"ResponseMetadata"` +} + +/*** Create Queue Response */ +type CreateQueueResult struct { + QueueUrl string `xml:"QueueUrl"` +} + +type CreateQueueResponse struct { + Xmlns string `xml:"xmlns,attr"` + Result CreateQueueResult `xml:"CreateQueueResult"` + Metadata ResponseMetadata `xml:"ResponseMetadata"` +} + +/*** Send Message Response */ + +type SendMessageResult struct { + MD5OfMessageAttributes string `xml:"MD5OfMessageAttributes"` + MD5OfMessageBody string `xml:"MD5OfMessageBody"` + MessageId string `xml:"MessageId"` +} + +type SendMessageResponse struct { + Xmlns string `xml:"xmlns,attr"` + Result SendMessageResult `xml:"SendMessageResult"` + Metadata ResponseMetadata `xml:"ResponseMetadata"` +} + +/*** Receive Message Response */ + +type ResultMessage struct { + MessageId string `xml:"MessageId,omitempty"` + ReceiptHandle string `xml:"ReceiptHandle,omitempty"` + MD5OfBody string `xml:"MD5OfBody,omitempty"` + Body []byte `xml:"Body,omitempty"` + MD5OfMessageAttributes string `xml:"MD5OfMessageAttributes,omitempty"` +} + +type ReceiveMessageResult struct { + Message []*ResultMessage `xml:"Message,omitempty"` +} + +type ReceiveMessageResponse struct { + Xmlns string `xml:"xmlns,attr"` + Result ReceiveMessageResult `xml:"ReceiveMessageResult"` + Metadata ResponseMetadata `xml:"ResponseMetadata"` +} + +/*** Delete Message Response */ +type DeleteMessageResponse struct { + Xmlns string `xml:"xmlns,attr,omitempty"` + Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"` +} + +type DeleteMessageBatchResultEntry struct { + Id string `xml:"Id"` +} + +type BatchResultErrorEntry struct { + Code string `xml:"Code"` + Id string `xml:"Id"` + Message string `xml:"Message,omitempty"` + SenderFault bool `xml:"SenderFault"` +} + +type DeleteMessageBatchResult struct { + Entry []DeleteMessageBatchResultEntry `xml:"DeleteMessageBatchResultEntry"` + Error []BatchResultErrorEntry `xml:"BatchResultErrorEntry,omitempty"` +} + +/*** Delete Message Batch Response */ +type DeleteMessageBatchResponse struct { + Xmlns string `xml:"xmlns,attr,omitempty"` + Result DeleteMessageBatchResult `xml:"DeleteMessageBatchResult"` + Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"` +} + +/*** Purge Queue Response */ +type PurgeQueueResponse struct { + Xmlns string `xml:"xmlns,attr,omitempty"` + Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"` +} + +/*** Get Queue Url Response */ +type GetQueueUrlResult struct { + QueueUrl string `xml:"QueueUrl,omitempty"` +} + +type GetQueueUrlResponse struct { + Xmlns string `xml:"xmlns,attr,omitempty"` + Result GetQueueUrlResult `xml:"GetQueueUrlResult"` + Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"` +} + +/*** Get Queue Attributes ***/ +type Attribute struct { + Name string `xml:"Name,omitempty"` + Value string `xml:"Value,omitempty"` +} + +type GetQueueAttributesResult struct { + /* VisibilityTimeout, DelaySeconds, ReceiveMessageWaitTimeSeconds, ApproximateNumberOfMessages + ApproximateNumberOfMessagesNotVisible, CreatedTimestamp, LastModifiedTimestamp, QueueArn */ + Attrs []Attribute `xml:"Attribute,omitempty"` +} + +type GetQueueAttributesResponse struct { + Xmlns string `xml:"xmlns,attr,omitempty"` + Result GetQueueAttributesResult `xml:"GetQueueAttributesResult"` + Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"` +} + +type SetQueueAttributesResponse struct { + Xmlns string `xml:"xmlns,attr,omitempty"` + Metadata ResponseMetadata `xml:"ResponseMetadata,omitempty"` +}