Skip to content

Commit

Permalink
add set subscription attributes v1
Browse files Browse the repository at this point in the history
  • Loading branch information
kojisaikiAtSony authored and Admiral-Piett committed Sep 20, 2024
1 parent ff7a55e commit 8a8707a
Show file tree
Hide file tree
Showing 10 changed files with 620 additions and 165 deletions.
74 changes: 34 additions & 40 deletions app/gosns/get_subscription_attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,47 +23,41 @@ func GetSubscriptionAttributesV1(req *http.Request) (int, interfaces.AbstractRes
return utils.CreateErrorResponseV1("InvalidParameterValue", false)
}

subscriptionArn := requestBody.SubscriptionArn

for _, topic := range app.SyncTopics.Topics {
for _, sub := range topic.Subscriptions {
if sub.SubscriptionArn == subscriptionArn {

entries := make([]models.SubscriptionAttributeEntry, 0, 0)
entry := models.SubscriptionAttributeEntry{Key: "Owner", Value: app.CurrentEnvironment.AccountID}
entries = append(entries, entry)
entry = models.SubscriptionAttributeEntry{Key: "RawMessageDelivery", Value: strconv.FormatBool(sub.Raw)}
entries = append(entries, entry)
entry = models.SubscriptionAttributeEntry{Key: "TopicArn", Value: sub.TopicArn}
entries = append(entries, entry)
entry = models.SubscriptionAttributeEntry{Key: "Endpoint", Value: sub.EndPoint}
entries = append(entries, entry)
entry = models.SubscriptionAttributeEntry{Key: "PendingConfirmation", Value: "false"}
entries = append(entries, entry)
entry = models.SubscriptionAttributeEntry{Key: "ConfirmationWasAuthenticated", Value: "true"}
entries = append(entries, entry)
entry = models.SubscriptionAttributeEntry{Key: "SubscriptionArn", Value: sub.SubscriptionArn}
entries = append(entries, entry)
entry = models.SubscriptionAttributeEntry{Key: "Protocol", Value: sub.Protocol}
entries = append(entries, entry)

if sub.FilterPolicy != nil {
filterPolicyBytes, _ := json.Marshal(sub.FilterPolicy)
entry = models.SubscriptionAttributeEntry{Key: "FilterPolicy", Value: string(filterPolicyBytes)}
entries = append(entries, entry)
}
sub := getSubscription(requestBody.SubscriptionArn)
if sub == nil {
return utils.CreateErrorResponseV1("SubscriptionNotFound", false)
}

result := models.GetSubscriptionAttributesResult{Attributes: models.GetSubscriptionAttributes{Entries: entries}}
uuid := uuid.NewString()
respStruct := models.GetSubscriptionAttributesResponse{
Xmlns: models.BASE_XMLNS,
Result: result,
Metadata: app.ResponseMetadata{RequestId: uuid}}
entries := make([]models.SubscriptionAttributeEntry, 0, 0)
entry := models.SubscriptionAttributeEntry{Key: "Owner", Value: app.CurrentEnvironment.AccountID}
entries = append(entries, entry)
entry = models.SubscriptionAttributeEntry{Key: "RawMessageDelivery", Value: strconv.FormatBool(sub.Raw)}
entries = append(entries, entry)
entry = models.SubscriptionAttributeEntry{Key: "TopicArn", Value: sub.TopicArn}
entries = append(entries, entry)
entry = models.SubscriptionAttributeEntry{Key: "Endpoint", Value: sub.EndPoint}
entries = append(entries, entry)
entry = models.SubscriptionAttributeEntry{Key: "PendingConfirmation", Value: "false"}
entries = append(entries, entry)
entry = models.SubscriptionAttributeEntry{Key: "ConfirmationWasAuthenticated", Value: "true"}
entries = append(entries, entry)
entry = models.SubscriptionAttributeEntry{Key: "SubscriptionArn", Value: sub.SubscriptionArn}
entries = append(entries, entry)
entry = models.SubscriptionAttributeEntry{Key: "Protocol", Value: sub.Protocol}
entries = append(entries, entry)

if sub.FilterPolicy != nil {
filterPolicyBytes, _ := json.Marshal(sub.FilterPolicy)
entry = models.SubscriptionAttributeEntry{Key: "FilterPolicy", Value: string(filterPolicyBytes)}
entries = append(entries, entry)
}

return http.StatusOK, respStruct
result := models.GetSubscriptionAttributesResult{Attributes: models.GetSubscriptionAttributes{Entries: entries}}
uuid := uuid.NewString()
respStruct := models.GetSubscriptionAttributesResponse{
Xmlns: models.BASE_XMLNS,
Result: result,
Metadata: app.ResponseMetadata{RequestId: uuid}}

}
}
}
return utils.CreateErrorResponseV1("SubscriptionNotFound", false)
return http.StatusOK, respStruct
}
61 changes: 11 additions & 50 deletions app/gosns/gosns.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"math/big"

"github.com/Admiral-Piett/goaws/app"
"github.com/Admiral-Piett/goaws/app/common"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -144,55 +143,6 @@ func ConfirmSubscription(w http.ResponseWriter, req *http.Request) {

}

func SetSubscriptionAttributes(w http.ResponseWriter, req *http.Request) {
content := req.FormValue("ContentType")
subsArn := req.FormValue("SubscriptionArn")
Attribute := req.FormValue("AttributeName")
Value := req.FormValue("AttributeValue")

for _, topic := range app.SyncTopics.Topics {
for _, sub := range topic.Subscriptions {
if sub.SubscriptionArn == subsArn {
if Attribute == "RawMessageDelivery" {
app.SyncTopics.Lock()
if Value == "true" {
sub.Raw = true
} else {
sub.Raw = false
}
app.SyncTopics.Unlock()
//Good Response == return
uuid, _ := common.NewUUID()
respStruct := app.SetSubscriptionAttributesResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.ResponseMetadata{RequestId: uuid}}
SendResponseBack(w, req, respStruct, content)
return
}

if Attribute == "FilterPolicy" {
filterPolicy := &app.FilterPolicy{}
err := json.Unmarshal([]byte(Value), filterPolicy)
if err != nil {
createErrorResponse(w, req, "ValidationError")
return
}

app.SyncTopics.Lock()
sub.FilterPolicy = filterPolicy
app.SyncTopics.Unlock()

//Good Response == return
uuid, _ := common.NewUUID()
respStruct := app.SetSubscriptionAttributesResponse{"http://queue.amazonaws.com/doc/2012-11-05/", app.ResponseMetadata{RequestId: uuid}}
SendResponseBack(w, req, respStruct, content)
return
}

}
}
}
createErrorResponse(w, req, "SubscriptionNotFound")
}

// NOTE: The use case for this is to use GoAWS to call some external system with the message payload. Essentially
// it is a localized subscription to some non-AWS endpoint.
func callEndpoint(endpoint string, subArn string, msg app.SNSMessage, raw bool) error {
Expand Down Expand Up @@ -277,6 +227,17 @@ func extractMessageFromJSON(msg string, protocol string) (string, error) {
return defaultMsg, nil
}

func getSubscription(subsArn string) *app.Subscription {
for _, topic := range app.SyncTopics.Topics {
for _, sub := range topic.Subscriptions {
if sub.SubscriptionArn == subsArn {
return sub
}
}
}
return nil
}

func createErrorResponse(w http.ResponseWriter, req *http.Request, err string) {
er := models.SnsErrors[err]
respStruct := models.ErrorResponse{
Expand Down
69 changes: 0 additions & 69 deletions app/gosns/gosns_test.go

This file was deleted.

66 changes: 66 additions & 0 deletions app/gosns/set_subscription_attributes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package gosns

import (
"encoding/json"
"fmt"
"net/http"

"github.com/Admiral-Piett/goaws/app"
"github.com/Admiral-Piett/goaws/app/interfaces"
"github.com/Admiral-Piett/goaws/app/models"
"github.com/Admiral-Piett/goaws/app/utils"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
)

func SetSubscriptionAttributesV1(req *http.Request) (int, interfaces.AbstractResponseBody) {
requestBody := models.NewSetSubscriptionAttributesRequest()
ok := utils.REQUEST_TRANSFORMER(requestBody, req, false)
if !ok {
log.Error("Invalid Request - SetSubscriptionAttributesV1")
return utils.CreateErrorResponseV1("InvalidParameterValue", false)
}

subsArn := requestBody.SubscriptionArn
attrName := requestBody.AttributeName
attrValue := requestBody.AttributeValue

sub := getSubscription(subsArn)
if sub == nil {
return utils.CreateErrorResponseV1("SubscriptionNotFound", false)
}

switch attrName {
case "RawMessageDelivery":
app.SyncTopics.Lock()
if attrValue == "true" {
sub.Raw = true
} else {
sub.Raw = false
}
app.SyncTopics.Unlock()

case "FilterPolicy":
filterPolicy := &app.FilterPolicy{}
err := json.Unmarshal([]byte(attrValue), filterPolicy)
if err != nil {
return utils.CreateErrorResponseV1("InvalidParameterValue", false)
}
app.SyncTopics.Lock()
sub.FilterPolicy = filterPolicy
app.SyncTopics.Unlock()

case "DeliveryPolicy", "FilterPolicyScope", "RedrivePolicy", "SubscriptionRoleArn":
log.Info(fmt.Sprintf("AttributeName [%s] is valid on AWS but it is not implemented.", attrName))

default:
return utils.CreateErrorResponseV1("InvalidParameterValue", false)
}

uuid := uuid.NewString()
respStruct := models.SetSubscriptionAttributesResponse{
Xmlns: models.BASE_XMLNS,
Metadata: app.ResponseMetadata{RequestId: uuid}}

return http.StatusOK, respStruct
}
Loading

0 comments on commit 8a8707a

Please sign in to comment.