Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: add samples for dead letter topics #1279

Merged
merged 9 commits into from
Mar 25, 2020
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
module github.com/GoogleCloudPlatform/golang-samples

go 1.11
go 1.14
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep as 1.11. This signifies the Go version compatibility, which we need to keep at 1.11, the oldest supported GAE/GCF version.


require (
cloud.google.com/go v0.55.0
cloud.google.com/go/bigquery v1.5.0
cloud.google.com/go/bigtable v1.1.0
cloud.google.com/go/bigtable v1.3.0
cloud.google.com/go/datastore v1.1.0
cloud.google.com/go/firestore v1.1.0
cloud.google.com/go/firestore v1.1.1
cloud.google.com/go/logging v1.0.0
cloud.google.com/go/pubsub v1.3.1
cloud.google.com/go/spanner v1.4.0
cloud.google.com/go/storage v1.6.0
contrib.go.opencensus.io/exporter/stackdriver v0.13.0
github.com/aws/aws-sdk-go v1.29.29
github.com/aws/aws-sdk-go v1.29.30
github.com/bmatcuk/doublestar v1.2.2
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
Expand All @@ -36,15 +36,15 @@ require (
github.com/mailjet/mailjet-apiv3-go v0.0.0-20190724151621-55e56f74078c
github.com/philhofer/fwd v1.0.0 // indirect
github.com/sendgrid/smtpapi-go v0.6.0 // indirect
github.com/tinylib/msgp v1.1.0 // indirect
github.com/tinylib/msgp v1.1.2 // indirect
go.opencensus.io v0.22.3
golang.org/x/exp v0.0.0-20200320212757-167ffe94c325
golang.org/x/net v0.0.0-20200320220750-118fecf932d8
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/text v0.3.2
google.golang.org/api v0.20.0
google.golang.org/appengine v1.6.5
google.golang.org/genproto v0.0.0-20200319113533-08878b785e9c
google.golang.org/genproto v0.0.0-20200323114720-3f67cca34472
google.golang.org/grpc v1.28.0
gopkg.in/sendgrid/sendgrid-go.v2 v2.0.0
gopkg.in/yaml.v2 v2.2.8
Expand Down
88 changes: 12 additions & 76 deletions go.sum

Large diffs are not rendered by default.

59 changes: 59 additions & 0 deletions pubsub/subscriptions/dead_letter_create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package subscriptions

// [START pubsub_dead_letter_create_subscription]
import (
"context"
"fmt"
"io"
"time"

"cloud.google.com/go/pubsub"
)

// createSubWithDeadLetter creates a subscription with a dead letter policy.
func createSubWithDeadLetter(w io.Writer, projectID, subID string, topicID string, fullyQualifiedDeadLetterTopic string) error {
// projectID := "my-project-id"
// subID := "my-sub"
// topicID := "my-topic"
// fullyQualifiedDeadLetterTopic := "projects/my-project/topics/my-dead-letter-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}

topic := client.Topic(topicID)

subConfig := pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 20 * time.Second,
DeadLetterPolicy: &pubsub.DeadLetterPolicy{
DeadLetterTopic: fullyQualifiedDeadLetterTopic,
MaxDeliveryAttempts: 10,
},
}

sub, err := client.CreateSubscription(ctx, subID, subConfig)
if err != nil {
return fmt.Errorf("CreateSubscription: %v", err)
}
fmt.Fprintf(w, "Created subscription (%s) with dead letter topic (%s)\n", sub.String(), fullyQualifiedDeadLetterTopic)
fmt.Fprintln(w, "To process dead letter messages, remember to add a subscription to your dead letter topic.")
return nil
}

// [END pubsub_dead_letter_create_subscription]
56 changes: 56 additions & 0 deletions pubsub/subscriptions/dead_letter_delivery_attempt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package subscriptions

// [START pubsub_dead_letter_delivery_attempt]
import (
"context"
"fmt"
"io"
"time"

"cloud.google.com/go/pubsub"
)

func pullMsgsDeadLetterDeliveryAttempt(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}

// Receive messages for 10 seconds.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

sub := client.Subscription(subID)
err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// When dead lettering is enabled, the delivery attempt field is a pointer to the
// the number of times the service has attempted to delivery a message.
// Otherwise, the field is nil.
if msg.DeliveryAttempt != nil {
fmt.Fprintf(w, "message: %s, delivery attempts: %d", msg.Data, *msg.DeliveryAttempt)
}
msg.Ack()
})
if err != nil {
return fmt.Errorf("got error in Receive: %v", err)
}
return nil
}

// [END pubsub_dead_letter_delivery_attempt]
46 changes: 46 additions & 0 deletions pubsub/subscriptions/dead_letter_remove.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package subscriptions

// [START pubsub_dead_letter_remove]
import (
"context"
"fmt"
"io"

"cloud.google.com/go/pubsub"
)

// removeDeadLetterTopic removes the dead letter policy from a subscription.
func removeDeadLetterTopic(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}

subConfig, err := client.Subscription(subID).Update(ctx, pubsub.SubscriptionConfigToUpdate{
DeadLetterPolicy: &pubsub.DeadLetterPolicy{},
})
if err != nil {
return fmt.Errorf("Update: %v", err)
}
fmt.Fprintf(w, "Updated subscription config: %+v\n", subConfig)
return nil
}

// [END pubsub_dead_letter_remove]
52 changes: 52 additions & 0 deletions pubsub/subscriptions/dead_letter_update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package subscriptions

// [START pubsub_dead_letter_update]
import (
"context"
"fmt"
"io"

"cloud.google.com/go/pubsub"
)

// updateDeadLetter updates an existing subscription with a dead letter policy.
func updateDeadLetter(w io.Writer, projectID, subID string, fullyQualifiedDeadLetterTopic string) error {
// projectID := "my-project-id"
// subID := "my-sub"
// fullyQualifiedDeadLetterTopic := "projects/my-project/topics/my-dead-letter-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}

updateConfig := pubsub.SubscriptionConfigToUpdate{
DeadLetterPolicy: &pubsub.DeadLetterPolicy{
DeadLetterTopic: fullyQualifiedDeadLetterTopic,
MaxDeliveryAttempts: 20,
},
}

subConfig, err := client.Subscription(subID).Update(ctx, updateConfig)
if err != nil {
return fmt.Errorf("Update: %v", err)
}
fmt.Fprintf(w, "Updated subscription config: %+v\n", subConfig)
return nil
}

// [END pubsub_dead_letter_update]
Loading