Skip to content

Commit

Permalink
fix(gcp): Support topic imports for GCP. (#722)
Browse files Browse the repository at this point in the history
* fix(gcp): Support topic imports for GCP.
* fix(gcp): Allow generic json parsing for topic subscriptions

Co-authored-by: Ryan Cartwright <39504851+HomelessDinosaur@users.noreply.github.com>
Co-authored-by: Ryan Cartwright <39504851+HomelessDinosaur@users.noreply.github.com>
  • Loading branch information
tjholm and HomelessDinosaur authored Jan 21, 2025
1 parent b74d46b commit 137a528
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 32 deletions.
1 change: 1 addition & 0 deletions cloud/gcp/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type GcpDatabaseConfig struct {
type GcpImports struct {
// A map of nitric names to GCP Secret Manager names
Secrets map[string]string
Topics map[string]string
}

type GcpBatchCompute struct {
Expand Down
7 changes: 7 additions & 0 deletions cloud/gcp/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

apiv1 "cloud.google.com/go/firestore/apiv1/admin"
"cloud.google.com/go/firestore/apiv1/admin/adminpb"
gcppubsub "cloud.google.com/go/pubsub/apiv1"
gcpsecretmanager "cloud.google.com/go/secretmanager/apiv1"
"github.com/nitrictech/nitric/cloud/common/deploy"
"github.com/nitrictech/nitric/cloud/common/deploy/provider"
Expand Down Expand Up @@ -74,6 +75,7 @@ type NitricGcpPulumiProvider struct {
BaseComputeRole *projects.IAMCustomRole

SecretManagerClient *gcpsecretmanager.Client
PubsubClient *gcppubsub.PublisherClient

JobDefinitionBucket *storage.Bucket
JobDefinitions map[string]*storage.BucketObject
Expand Down Expand Up @@ -138,6 +140,11 @@ func (a *NitricGcpPulumiProvider) Init(attributes map[string]interface{}) error
return err
}

a.PubsubClient, err = gcppubsub.NewPublisherClient(context.Background())
if err != nil {
return err
}

return nil
}

Expand Down
58 changes: 55 additions & 3 deletions cloud/gcp/deploy/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package deploy
import (
"fmt"

pubsubv1 "cloud.google.com/go/pubsub/apiv1"
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
"github.com/nitrictech/nitric/cloud/common/deploy/resources"
"google.golang.org/protobuf/types/known/fieldmaskpb"

common "github.com/nitrictech/nitric/cloud/common/deploy/tags"
deploymentspb "github.com/nitrictech/nitric/core/pkg/proto/deployments/v1"
Expand All @@ -28,21 +31,70 @@ import (
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

// tagTopic - applies nitric tags to an existing topic in GCP and adds it to the stack.
func tagTopic(ctx *pulumi.Context, name string, projectId string, topicName string, tags map[string]string, client *pubsubv1.PublisherClient, opts []pulumi.ResourceOption) (*pubsub.Topic, error) {
topicLookup, err := pubsub.LookupTopic(ctx, &pubsub.LookupTopicArgs{
Project: &projectId,
Name: topicName,
})
if err != nil {
return nil, err
}

_, err = client.UpdateTopic(ctx.Context(), &pubsubpb.UpdateTopicRequest{
Topic: &pubsubpb.Topic{
Name: topicLookup.Id,
Labels: tags,
},
UpdateMask: &fieldmaskpb.FieldMask{
Paths: []string{"labels"},
},
})
if err != nil {
return nil, err
}

topic, err := pubsub.GetTopic(
ctx,
name,
pulumi.ID(topicLookup.Id),
nil,
// nitric didn't create this resource, so it shouldn't delete it either.
append(opts, pulumi.RetainOnDelete(true))...,
)
if err != nil {
return nil, err
}
return topic, nil
}

func createTopic(ctx *pulumi.Context, name string, stackId string, tags map[string]string, opts []pulumi.ResourceOption) (*pubsub.Topic, error) {
return pubsub.NewTopic(ctx, name, &pubsub.TopicArgs{
Labels: pulumi.ToStringMap(tags),
}, opts...)
}

func GetSubName(serviceName string, topicName string) string {
return fmt.Sprintf("%s-%s-sub", serviceName, topicName)
}

func (p *NitricGcpPulumiProvider) Topic(ctx *pulumi.Context, parent pulumi.Resource, name string, config *deploymentspb.Topic) error {
var err error
var topic *pubsub.Topic
opts := append([]pulumi.ResourceOption{}, pulumi.Parent(parent))

p.Topics[name], err = pubsub.NewTopic(ctx, name, &pubsub.TopicArgs{
Labels: pulumi.ToStringMap(common.Tags(p.StackId, name, resources.Topic)),
}, p.WithDefaultResourceOptions(opts...)...)
if gcpName, ok := p.GcpConfig.Import.Topics[name]; ok {
topic, err = tagTopic(ctx, name, p.GcpConfig.ProjectId, gcpName, common.Tags(p.StackId, name, resources.Topic), p.PubsubClient, opts)
} else {
topic, err = createTopic(ctx, name, p.StackName, common.Tags(p.StackId, name, resources.Topic), opts)
}

if err != nil {
return err
}

p.Topics[name] = topic

for _, sub := range config.Subscriptions {
targetService := p.CloudRunServices[sub.GetService()]

Expand Down
50 changes: 21 additions & 29 deletions cloud/gcp/runtime/gateway/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
topicpb "github.com/nitrictech/nitric/core/pkg/proto/topics/v1"
topicspb "github.com/nitrictech/nitric/core/pkg/proto/topics/v1"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
)

type gcpMiddleware struct {
Expand Down Expand Up @@ -78,19 +79,28 @@ func (g *gcpMiddleware) handleSubscription(opts *gateway.GatewayStartOpts) fasth
var message topicpb.TopicMessage
err := proto.Unmarshal(pubsubEvent.Message.Data, &message)
if err != nil {
ctx.Error("could not unmarshal event data", 500)
return
fmt.Println("could not parse message as a nitric event attempting to parse as generic json payload")

messageData := map[string]any{}
err := json.Unmarshal(pubsubEvent.Message.Data, &messageData)
if err != nil {
ctx.Error("could not unmarshal event data", 500)
return
}

structPayload, err := structpb.NewStruct(messageData)
if err != nil {
ctx.Error("could not convert message data to struct", 500)
return
}

message = topicpb.TopicMessage{
Content: &topicspb.TopicMessage_StructPayload{
StructPayload: structPayload,
},
}
}

// event := &faaspb.TriggerRequest{
// Context: &faaspb.TriggerRequest_Topic{
// Topic: &faaspb.TopicTriggerContext{
// Topic: topicName,
// Message: &message,
// },
// },
// }

event := &topicspb.ServerMessage{
Content: &topicspb.ServerMessage_MessageRequest{
MessageRequest: &topicspb.MessageRequest{
Expand All @@ -100,24 +110,6 @@ func (g *gcpMiddleware) handleSubscription(opts *gateway.GatewayStartOpts) fasth
},
}

// worker, err := process.GetWorker(&pool.GetWorkerOptions{
// Trigger: event,
// })
// if err != nil {
// ctx.Error("Could not find handle for event", 500)
// }

// traceKey := propagator.CloudTraceFormatPropagator{}.Fields()[0]
// traceCtx := context.TODO()

// if pubsubEvent.Message.Attributes[traceKey] != "" {
// var mc propagation.MapCarrier = pubsubEvent.Message.Attributes
// traceCtx = propagator.CloudTraceFormatPropagator{}.Extract(traceCtx, mc)
// } else {
// var hc propagation.HeaderCarrier = base_http.HttpHeadersToMap(&ctx.Request.Header)
// traceCtx = propagator.CloudTraceFormatPropagator{}.Extract(traceCtx, hc)
// }

response, err := opts.TopicsListenerPlugin.HandleRequest(event)
if err != nil {
ctx.Error(fmt.Sprintf("Error handling event %v", err), 500)
Expand Down
59 changes: 59 additions & 0 deletions cloud/gcp/runtime/gateway/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,5 +213,64 @@ var _ = Describe("Http", func() {
Expect(string(responseBody)).To(Equal("success"))
})
})

When("From a subscription with a JSON event", func() {
content := map[string]interface{}{
"Test": "Test",
}

messageBytes, err := json.Marshal(content)
Expect(err).To(BeNil())

b64Event := base64.StdEncoding.EncodeToString(messageBytes)
payloadBytes, _ := json.Marshal(&map[string]interface{}{
"subscription": "test",
"message": map[string]interface{}{
"attributes": map[string]string{
"x-nitric-topic": "test",
},
"id": "test",
"data": b64Event,
},
})

It("Should handle the event successfully", func() {
var capturedRequest *topicspb.ServerMessage

By("Handling exactly 1 request")
mockTopicRequestHandler.EXPECT().HandleRequest(gomock.Any()).Times(1).DoAndReturn(func(arg0 interface{}) (*topicspb.ClientMessage, error) {
capturedRequest = arg0.(*topicspb.ServerMessage)

return &topicspb.ClientMessage{
Id: "test",
Content: &topicspb.ClientMessage_MessageResponse{
MessageResponse: &topicspb.MessageResponse{
Success: true,
},
},
}, nil
})

request, err := http.NewRequest("POST", fmt.Sprintf("%s/x-nitric-topic/test", gatewayUrl), bytes.NewReader(payloadBytes))
Expect(err).To(BeNil())
request.Header.Add("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(request)
Expect(err).To(BeNil())
responseBody, _ := io.ReadAll(resp.Body)

By("Not returning an error")
Expect(err).To(BeNil())

capturedMessageBody := capturedRequest.GetMessageRequest().GetMessage().GetStructPayload().AsMap()
By("Having the orignal payload translated")
Expect(capturedMessageBody["Test"]).To(Equal("Test"))

By("The request returns a successful status")
Expect(resp.StatusCode).To(Equal(200))

By("Returning the expected output")
Expect(string(responseBody)).To(Equal("success"))
})
})
})
})

0 comments on commit 137a528

Please sign in to comment.