diff --git a/cloud/gcp/common/config.go b/cloud/gcp/common/config.go index d44ba9c50..79b6761c3 100644 --- a/cloud/gcp/common/config.go +++ b/cloud/gcp/common/config.go @@ -36,6 +36,7 @@ type GcpDatabaseConfig struct { type GcpImports struct { // A map of nitric names to GCP Secret Manager names Secrets map[string]string + Topics map[string]string } type GcpBatchCompute struct { diff --git a/cloud/gcp/deploy/deploy.go b/cloud/gcp/deploy/deploy.go index 7e795e71e..8632de564 100644 --- a/cloud/gcp/deploy/deploy.go +++ b/cloud/gcp/deploy/deploy.go @@ -26,6 +26,7 @@ import ( apiv1 "cloud.google.com/go/firestore/apiv1/admin" "cloud.google.com/go/firestore/apiv1/admin/adminpb" + gcppubsub "cloud.google.com/go/pubsub/apiv1" gcpsecretmanager "cloud.google.com/go/secretmanager/apiv1" "github.com/nitrictech/nitric/cloud/common/deploy" "github.com/nitrictech/nitric/cloud/common/deploy/provider" @@ -74,6 +75,7 @@ type NitricGcpPulumiProvider struct { BaseComputeRole *projects.IAMCustomRole SecretManagerClient *gcpsecretmanager.Client + PubsubClient *gcppubsub.PublisherClient JobDefinitionBucket *storage.Bucket JobDefinitions map[string]*storage.BucketObject @@ -138,6 +140,11 @@ func (a *NitricGcpPulumiProvider) Init(attributes map[string]interface{}) error return err } + a.PubsubClient, err = gcppubsub.NewPublisherClient(context.Background()) + if err != nil { + return err + } + return nil } diff --git a/cloud/gcp/deploy/topic.go b/cloud/gcp/deploy/topic.go index b52840ac8..ee254c6f4 100644 --- a/cloud/gcp/deploy/topic.go +++ b/cloud/gcp/deploy/topic.go @@ -19,7 +19,10 @@ package deploy import ( "fmt" + pubsubv1 "cloud.google.com/go/pubsub/apiv1" + "cloud.google.com/go/pubsub/apiv1/pubsubpb" "github.com/nitrictech/nitric/cloud/common/deploy/resources" + "google.golang.org/protobuf/types/known/fieldmaskpb" common "github.com/nitrictech/nitric/cloud/common/deploy/tags" deploymentspb "github.com/nitrictech/nitric/core/pkg/proto/deployments/v1" @@ -28,21 +31,70 @@ import ( "github.com/pulumi/pulumi/sdk/v3/go/pulumi" ) +// tagTopic - applies nitric tags to an existing topic in GCP and adds it to the stack. +func tagTopic(ctx *pulumi.Context, name string, projectId string, topicName string, tags map[string]string, client *pubsubv1.PublisherClient, opts []pulumi.ResourceOption) (*pubsub.Topic, error) { + topicLookup, err := pubsub.LookupTopic(ctx, &pubsub.LookupTopicArgs{ + Project: &projectId, + Name: topicName, + }) + if err != nil { + return nil, err + } + + _, err = client.UpdateTopic(ctx.Context(), &pubsubpb.UpdateTopicRequest{ + Topic: &pubsubpb.Topic{ + Name: topicLookup.Id, + Labels: tags, + }, + UpdateMask: &fieldmaskpb.FieldMask{ + Paths: []string{"labels"}, + }, + }) + if err != nil { + return nil, err + } + + topic, err := pubsub.GetTopic( + ctx, + name, + pulumi.ID(topicLookup.Id), + nil, + // nitric didn't create this resource, so it shouldn't delete it either. + append(opts, pulumi.RetainOnDelete(true))..., + ) + if err != nil { + return nil, err + } + return topic, nil +} + +func createTopic(ctx *pulumi.Context, name string, stackId string, tags map[string]string, opts []pulumi.ResourceOption) (*pubsub.Topic, error) { + return pubsub.NewTopic(ctx, name, &pubsub.TopicArgs{ + Labels: pulumi.ToStringMap(tags), + }, opts...) +} + func GetSubName(serviceName string, topicName string) string { return fmt.Sprintf("%s-%s-sub", serviceName, topicName) } func (p *NitricGcpPulumiProvider) Topic(ctx *pulumi.Context, parent pulumi.Resource, name string, config *deploymentspb.Topic) error { var err error + var topic *pubsub.Topic opts := append([]pulumi.ResourceOption{}, pulumi.Parent(parent)) - p.Topics[name], err = pubsub.NewTopic(ctx, name, &pubsub.TopicArgs{ - Labels: pulumi.ToStringMap(common.Tags(p.StackId, name, resources.Topic)), - }, p.WithDefaultResourceOptions(opts...)...) + if gcpName, ok := p.GcpConfig.Import.Topics[name]; ok { + topic, err = tagTopic(ctx, name, p.GcpConfig.ProjectId, gcpName, common.Tags(p.StackId, name, resources.Topic), p.PubsubClient, opts) + } else { + topic, err = createTopic(ctx, name, p.StackName, common.Tags(p.StackId, name, resources.Topic), opts) + } + if err != nil { return err } + p.Topics[name] = topic + for _, sub := range config.Subscriptions { targetService := p.CloudRunServices[sub.GetService()] diff --git a/cloud/gcp/runtime/gateway/http.go b/cloud/gcp/runtime/gateway/http.go index 988435b0b..1de9b83df 100644 --- a/cloud/gcp/runtime/gateway/http.go +++ b/cloud/gcp/runtime/gateway/http.go @@ -32,6 +32,7 @@ import ( topicpb "github.com/nitrictech/nitric/core/pkg/proto/topics/v1" topicspb "github.com/nitrictech/nitric/core/pkg/proto/topics/v1" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/structpb" ) type gcpMiddleware struct { @@ -78,19 +79,28 @@ func (g *gcpMiddleware) handleSubscription(opts *gateway.GatewayStartOpts) fasth var message topicpb.TopicMessage err := proto.Unmarshal(pubsubEvent.Message.Data, &message) if err != nil { - ctx.Error("could not unmarshal event data", 500) - return + fmt.Println("could not parse message as a nitric event attempting to parse as generic json payload") + + messageData := map[string]any{} + err := json.Unmarshal(pubsubEvent.Message.Data, &messageData) + if err != nil { + ctx.Error("could not unmarshal event data", 500) + return + } + + structPayload, err := structpb.NewStruct(messageData) + if err != nil { + ctx.Error("could not convert message data to struct", 500) + return + } + + message = topicpb.TopicMessage{ + Content: &topicspb.TopicMessage_StructPayload{ + StructPayload: structPayload, + }, + } } - // event := &faaspb.TriggerRequest{ - // Context: &faaspb.TriggerRequest_Topic{ - // Topic: &faaspb.TopicTriggerContext{ - // Topic: topicName, - // Message: &message, - // }, - // }, - // } - event := &topicspb.ServerMessage{ Content: &topicspb.ServerMessage_MessageRequest{ MessageRequest: &topicspb.MessageRequest{ @@ -100,24 +110,6 @@ func (g *gcpMiddleware) handleSubscription(opts *gateway.GatewayStartOpts) fasth }, } - // worker, err := process.GetWorker(&pool.GetWorkerOptions{ - // Trigger: event, - // }) - // if err != nil { - // ctx.Error("Could not find handle for event", 500) - // } - - // traceKey := propagator.CloudTraceFormatPropagator{}.Fields()[0] - // traceCtx := context.TODO() - - // if pubsubEvent.Message.Attributes[traceKey] != "" { - // var mc propagation.MapCarrier = pubsubEvent.Message.Attributes - // traceCtx = propagator.CloudTraceFormatPropagator{}.Extract(traceCtx, mc) - // } else { - // var hc propagation.HeaderCarrier = base_http.HttpHeadersToMap(&ctx.Request.Header) - // traceCtx = propagator.CloudTraceFormatPropagator{}.Extract(traceCtx, hc) - // } - response, err := opts.TopicsListenerPlugin.HandleRequest(event) if err != nil { ctx.Error(fmt.Sprintf("Error handling event %v", err), 500) diff --git a/cloud/gcp/runtime/gateway/http_test.go b/cloud/gcp/runtime/gateway/http_test.go index dea972f51..6214260fa 100644 --- a/cloud/gcp/runtime/gateway/http_test.go +++ b/cloud/gcp/runtime/gateway/http_test.go @@ -213,5 +213,64 @@ var _ = Describe("Http", func() { Expect(string(responseBody)).To(Equal("success")) }) }) + + When("From a subscription with a JSON event", func() { + content := map[string]interface{}{ + "Test": "Test", + } + + messageBytes, err := json.Marshal(content) + Expect(err).To(BeNil()) + + b64Event := base64.StdEncoding.EncodeToString(messageBytes) + payloadBytes, _ := json.Marshal(&map[string]interface{}{ + "subscription": "test", + "message": map[string]interface{}{ + "attributes": map[string]string{ + "x-nitric-topic": "test", + }, + "id": "test", + "data": b64Event, + }, + }) + + It("Should handle the event successfully", func() { + var capturedRequest *topicspb.ServerMessage + + By("Handling exactly 1 request") + mockTopicRequestHandler.EXPECT().HandleRequest(gomock.Any()).Times(1).DoAndReturn(func(arg0 interface{}) (*topicspb.ClientMessage, error) { + capturedRequest = arg0.(*topicspb.ServerMessage) + + return &topicspb.ClientMessage{ + Id: "test", + Content: &topicspb.ClientMessage_MessageResponse{ + MessageResponse: &topicspb.MessageResponse{ + Success: true, + }, + }, + }, nil + }) + + request, err := http.NewRequest("POST", fmt.Sprintf("%s/x-nitric-topic/test", gatewayUrl), bytes.NewReader(payloadBytes)) + Expect(err).To(BeNil()) + request.Header.Add("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(request) + Expect(err).To(BeNil()) + responseBody, _ := io.ReadAll(resp.Body) + + By("Not returning an error") + Expect(err).To(BeNil()) + + capturedMessageBody := capturedRequest.GetMessageRequest().GetMessage().GetStructPayload().AsMap() + By("Having the orignal payload translated") + Expect(capturedMessageBody["Test"]).To(Equal("Test")) + + By("The request returns a successful status") + Expect(resp.StatusCode).To(Equal(200)) + + By("Returning the expected output") + Expect(string(responseBody)).To(Equal("success")) + }) + }) }) })