Skip to content

Commit

Permalink
pubsub: add samples for dead letter topics (#1279)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex authored Mar 25, 2020
1 parent 35208ca commit 92eae2e
Show file tree
Hide file tree
Showing 7 changed files with 395 additions and 82 deletions.
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ go 1.11
require (
cloud.google.com/go v0.55.0
cloud.google.com/go/bigquery v1.5.0
cloud.google.com/go/bigtable v1.1.0
cloud.google.com/go/bigtable v1.3.0
cloud.google.com/go/datastore v1.1.0
cloud.google.com/go/firestore v1.1.0
cloud.google.com/go/firestore v1.1.1
cloud.google.com/go/logging v1.0.0
cloud.google.com/go/pubsub v1.3.1
cloud.google.com/go/spanner v1.4.0
cloud.google.com/go/storage v1.6.0
contrib.go.opencensus.io/exporter/stackdriver v0.13.0
github.com/aws/aws-sdk-go v1.29.29
github.com/aws/aws-sdk-go v1.29.30
github.com/bmatcuk/doublestar v1.2.2
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
Expand All @@ -36,15 +36,15 @@ require (
github.com/mailjet/mailjet-apiv3-go v0.0.0-20190724151621-55e56f74078c
github.com/philhofer/fwd v1.0.0 // indirect
github.com/sendgrid/smtpapi-go v0.6.0 // indirect
github.com/tinylib/msgp v1.1.0 // indirect
github.com/tinylib/msgp v1.1.2 // indirect
go.opencensus.io v0.22.3
golang.org/x/exp v0.0.0-20200320212757-167ffe94c325
golang.org/x/net v0.0.0-20200320220750-118fecf932d8
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/text v0.3.2
google.golang.org/api v0.20.0
google.golang.org/appengine v1.6.5
google.golang.org/genproto v0.0.0-20200319113533-08878b785e9c
google.golang.org/genproto v0.0.0-20200323114720-3f67cca34472
google.golang.org/grpc v1.28.0
gopkg.in/sendgrid/sendgrid-go.v2 v2.0.0
gopkg.in/yaml.v2 v2.2.8
Expand Down
88 changes: 12 additions & 76 deletions go.sum

Large diffs are not rendered by default.

59 changes: 59 additions & 0 deletions pubsub/subscriptions/dead_letter_create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package subscriptions

// [START pubsub_dead_letter_create_subscription]
import (
"context"
"fmt"
"io"
"time"

"cloud.google.com/go/pubsub"
)

// createSubWithDeadLetter creates a subscription with a dead letter policy.
func createSubWithDeadLetter(w io.Writer, projectID, subID string, topicID string, fullyQualifiedDeadLetterTopic string) error {
// projectID := "my-project-id"
// subID := "my-sub"
// topicID := "my-topic"
// fullyQualifiedDeadLetterTopic := "projects/my-project/topics/my-dead-letter-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}

topic := client.Topic(topicID)

subConfig := pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 20 * time.Second,
DeadLetterPolicy: &pubsub.DeadLetterPolicy{
DeadLetterTopic: fullyQualifiedDeadLetterTopic,
MaxDeliveryAttempts: 10,
},
}

sub, err := client.CreateSubscription(ctx, subID, subConfig)
if err != nil {
return fmt.Errorf("CreateSubscription: %v", err)
}
fmt.Fprintf(w, "Created subscription (%s) with dead letter topic (%s)\n", sub.String(), fullyQualifiedDeadLetterTopic)
fmt.Fprintln(w, "To process dead letter messages, remember to add a subscription to your dead letter topic.")
return nil
}

// [END pubsub_dead_letter_create_subscription]
56 changes: 56 additions & 0 deletions pubsub/subscriptions/dead_letter_delivery_attempt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package subscriptions

// [START pubsub_dead_letter_delivery_attempt]
import (
"context"
"fmt"
"io"
"time"

"cloud.google.com/go/pubsub"
)

func pullMsgsDeadLetterDeliveryAttempt(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}

// Receive messages for 10 seconds.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

sub := client.Subscription(subID)
err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// When dead lettering is enabled, the delivery attempt field is a pointer to the
// the number of times the service has attempted to delivery a message.
// Otherwise, the field is nil.
if msg.DeliveryAttempt != nil {
fmt.Fprintf(w, "message: %s, delivery attempts: %d", msg.Data, *msg.DeliveryAttempt)
}
msg.Ack()
})
if err != nil {
return fmt.Errorf("got error in Receive: %v", err)
}
return nil
}

// [END pubsub_dead_letter_delivery_attempt]
46 changes: 46 additions & 0 deletions pubsub/subscriptions/dead_letter_remove.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package subscriptions

// [START pubsub_dead_letter_remove]
import (
"context"
"fmt"
"io"

"cloud.google.com/go/pubsub"
)

// removeDeadLetterTopic removes the dead letter policy from a subscription.
func removeDeadLetterTopic(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}

subConfig, err := client.Subscription(subID).Update(ctx, pubsub.SubscriptionConfigToUpdate{
DeadLetterPolicy: &pubsub.DeadLetterPolicy{},
})
if err != nil {
return fmt.Errorf("Update: %v", err)
}
fmt.Fprintf(w, "Updated subscription config: %+v\n", subConfig)
return nil
}

// [END pubsub_dead_letter_remove]
52 changes: 52 additions & 0 deletions pubsub/subscriptions/dead_letter_update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package subscriptions

// [START pubsub_dead_letter_update_subscription]
import (
"context"
"fmt"
"io"

"cloud.google.com/go/pubsub"
)

// updateDeadLetter updates an existing subscription with a dead letter policy.
func updateDeadLetter(w io.Writer, projectID, subID string, fullyQualifiedDeadLetterTopic string) error {
// projectID := "my-project-id"
// subID := "my-sub"
// fullyQualifiedDeadLetterTopic := "projects/my-project/topics/my-dead-letter-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}

updateConfig := pubsub.SubscriptionConfigToUpdate{
DeadLetterPolicy: &pubsub.DeadLetterPolicy{
DeadLetterTopic: fullyQualifiedDeadLetterTopic,
MaxDeliveryAttempts: 20,
},
}

subConfig, err := client.Subscription(subID).Update(ctx, updateConfig)
if err != nil {
return fmt.Errorf("Update: %v", err)
}
fmt.Fprintf(w, "Updated subscription config: %+v\n", subConfig)
return nil
}

// [END pubsub_dead_letter_update_subscription]
Loading

0 comments on commit 92eae2e

Please sign in to comment.