Skip to content
This repository was archived by the owner on Jan 19, 2022. It is now read-only.

Commit 82a0a1b

Browse files
authoredMay 22, 2019
PubsubTemplate publish to topics in other projects (#1678)
This change to the DefaultPublisherFactory allows the project id of the topic to be overwitten directly from the topic specification in the topic string. This allows use-cases like pubSubTemplate.publish("projects/other-project/topics/the-topic", "payload"). This change applied across the board to allow publishing, subscribing, creating, and deleting topics using fully-qualified topic names. Fixes #1678.
1 parent 0ff9d6f commit 82a0a1b

File tree

5 files changed

+167
-19
lines changed

5 files changed

+167
-19
lines changed
 

‎docs/src/main/asciidoc/pubsub.adoc

+3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ It provides the common set of operations needed to interact with Google Cloud Pu
3939

4040
`PubSubTemplate` provides asynchronous methods to publish messages to a Google Cloud Pub/Sub topic.
4141
The `publish()` method takes in a topic name to post the message to, a payload of a generic type and, optionally, a map with the message headers.
42+
The topic name could either be a canonical topic name within the current project, or the fully-qualified name referring to a topic in a different project using the `projects/<project_name>/topics/<topic_name>` format.
4243

4344
Here is an example of how to publish a message to a Google Cloud Pub/Sub topic:
4445

@@ -192,6 +193,8 @@ flux.doOnNext(AcknowledgeablePubsubMessage::ack);
192193
`PubSubAdmin` is the abstraction provided by Spring Cloud GCP to manage Google Cloud Pub/Sub resources.
193194
It allows for the creation, deletion and listing of topics and subscriptions.
194195

196+
NOTE: Generally when referring to topics, you can either use the short canonical topic name within the current project, or the fully-qualified name referring to a topic in a different project using the `projects/<project_name>/topics/<topic_name>` format.
197+
195198
`PubSubAdmin` depends on `GcpProjectIdProvider` and either a `CredentialsProvider` or a `TopicAdminClient` and a `SubscriptionAdminClient`.
196199
If given a `CredentialsProvider`, it creates a `TopicAdminClient` and a `SubscriptionAdminClient` with the Google Cloud Java Library for Pub/Sub default settings.
197200
The Spring Boot starter for GCP Pub/Sub auto-configures a `PubSubAdmin` object using the `GcpProjectIdProvider` and the `CredentialsProvider` auto-configured by the Spring Boot GCP Core starter.

‎spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/PubSubAdmin.java

+21-16
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,17 @@
3030
import com.google.cloud.pubsub.v1.TopicAdminSettings;
3131
import com.google.pubsub.v1.ProjectName;
3232
import com.google.pubsub.v1.ProjectSubscriptionName;
33-
import com.google.pubsub.v1.ProjectTopicName;
3433
import com.google.pubsub.v1.PushConfig;
3534
import com.google.pubsub.v1.Subscription;
3635
import com.google.pubsub.v1.Topic;
3736

3837
import org.springframework.cloud.gcp.core.GcpProjectIdProvider;
38+
import org.springframework.cloud.gcp.pubsub.support.PubSubTopicUtils;
3939
import org.springframework.util.Assert;
4040

4141
/**
42-
* Pub/Sub admin utility that creates new topics and subscriptions on Google Cloud Pub/Sub.
42+
* Pub/Sub admin utility that creates new topics and subscriptions on Google Cloud
43+
* Pub/Sub.
4344
*
4445
* @author João André Martins
4546
* @author Mike Eltsufin
@@ -96,26 +97,28 @@ public PubSubAdmin(GcpProjectIdProvider projectIdProvider, TopicAdminClient topi
9697
/**
9798
* Create a new topic on Google Cloud Pub/Sub.
9899
*
99-
* @param topicName the name for the new topic
100+
* @param topicName the name for the new topic within the current project, or the
101+
* fully-qualified topic name in the projects/&lt;project_name&gt;/topics/&lt;topic_name&gt; format
100102
* @return the created topic
101103
*/
102104
public Topic createTopic(String topicName) {
103105
Assert.hasText(topicName, "No topic name was specified.");
104106

105-
return this.topicAdminClient.createTopic(ProjectTopicName.of(this.projectId, topicName));
107+
return this.topicAdminClient.createTopic(PubSubTopicUtils.toProjectTopicName(topicName, this.projectId));
106108
}
107109

108110
/**
109111
* Get the configuration of a Google Cloud Pub/Sub topic.
110112
*
111-
* @param topicName canonical topic name, e.g., "topicName"
113+
* @param topicName canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
114+
* "projects/&lt;project_name&gt;/topics/&lt;topic_name&gt;" format
112115
* @return topic configuration or {@code null} if topic doesn't exist
113116
*/
114117
public Topic getTopic(String topicName) {
115118
Assert.hasText(topicName, "No topic name was specified.");
116119

117120
try {
118-
return this.topicAdminClient.getTopic(ProjectTopicName.of(this.projectId, topicName));
121+
return this.topicAdminClient.getTopic(PubSubTopicUtils.toProjectTopicName(topicName, this.projectId));
119122
}
120123
catch (ApiException aex) {
121124
if (aex.getStatusCode().getCode() == StatusCode.Code.NOT_FOUND) {
@@ -129,12 +132,13 @@ public Topic getTopic(String topicName) {
129132
/**
130133
* Delete a topic from Google Cloud Pub/Sub.
131134
*
132-
* @param topicName the name of the topic to be deleted
135+
* @param topicName canonical topic name, e.g., "topicName", or the fully-qualified topic
136+
* name in the "projects/&lt;project_name&gt;/topics/&lt;topic_name&gt;" format
133137
*/
134138
public void deleteTopic(String topicName) {
135139
Assert.hasText(topicName, "No topic name was specified.");
136140

137-
this.topicAdminClient.deleteTopic(ProjectTopicName.of(this.projectId, topicName));
141+
this.topicAdminClient.deleteTopic(PubSubTopicUtils.toProjectTopicName(topicName, this.projectId));
138142
}
139143

140144
/**
@@ -167,8 +171,8 @@ public Subscription createSubscription(String subscriptionName, String topicName
167171
*
168172
* @param subscriptionName the name of the new subscription
169173
* @param topicName the name of the topic being subscribed to
170-
* @param ackDeadline deadline in seconds before a message is resent, must be between 10 and 600 seconds.
171-
* If not provided, set to default of 10 seconds
174+
* @param ackDeadline deadline in seconds before a message is resent, must be between 10
175+
* and 600 seconds. If not provided, set to default of 10 seconds
172176
* @return the created subscription
173177
*/
174178
public Subscription createSubscription(String subscriptionName, String topicName,
@@ -194,11 +198,12 @@ public Subscription createSubscription(String subscriptionName, String topicName
194198
* Create a new subscription on Google Cloud Pub/Sub.
195199
*
196200
* @param subscriptionName the name of the new subscription
197-
* @param topicName the name of the topic being subscribed to
198-
* @param ackDeadline deadline in seconds before a message is resent, must be between 10 and 600 seconds.
199-
* If not provided, set to default of 10 seconds
200-
* @param pushEndpoint the URL of the service receiving the push messages. If not provided, uses
201-
* message pulling by default
201+
* @param topicName canonical topic name, e.g., "topicName", or the fully-qualified topic name in the
202+
* "projects/&lt;project_name&gt;/topics/&lt;topic_name&gt;" format
203+
* @param ackDeadline deadline in seconds before a message is resent, must be between 10
204+
* and 600 seconds. If not provided, set to default of 10 seconds
205+
* @param pushEndpoint the URL of the service receiving the push messages. If not
206+
* provided, uses message pulling by default
202207
* @return the created subscription
203208
*/
204209
public Subscription createSubscription(String subscriptionName, String topicName,
@@ -219,7 +224,7 @@ public Subscription createSubscription(String subscriptionName, String topicName
219224

220225
return this.subscriptionAdminClient.createSubscription(
221226
ProjectSubscriptionName.of(this.projectId, subscriptionName),
222-
ProjectTopicName.of(this.projectId, topicName),
227+
PubSubTopicUtils.toProjectTopicName(topicName, this.projectId),
223228
pushConfigBuilder.build(),
224229
finalAckDeadline);
225230
}

‎spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/DefaultPublisherFactory.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.google.api.gax.rpc.HeaderProvider;
2828
import com.google.api.gax.rpc.TransportChannelProvider;
2929
import com.google.cloud.pubsub.v1.Publisher;
30-
import com.google.pubsub.v1.ProjectTopicName;
3130

3231
import org.springframework.cloud.gcp.core.GcpProjectIdProvider;
3332
import org.springframework.cloud.gcp.pubsub.core.PubSubException;
@@ -65,7 +64,7 @@ public class DefaultPublisherFactory implements PublisherFactory {
6564
/**
6665
* Create {@link DefaultPublisherFactory} instance based on the provided {@link GcpProjectIdProvider}.
6766
* <p>The {@link GcpProjectIdProvider} must not be null, neither provide an empty {@code projectId}.
68-
* @param projectIdProvider provides the GCP project ID
67+
* @param projectIdProvider provides the default GCP project ID for selecting the topic
6968
*/
7069
public DefaultPublisherFactory(GcpProjectIdProvider projectIdProvider) {
7170
Assert.notNull(projectIdProvider, "The project ID provider can't be null.");
@@ -129,7 +128,7 @@ public Publisher createPublisher(String topic) {
129128
return this.publishers.computeIfAbsent(topic, (key) -> {
130129
try {
131130
Publisher.Builder publisherBuilder =
132-
Publisher.newBuilder(ProjectTopicName.of(this.projectId, key));
131+
Publisher.newBuilder(PubSubTopicUtils.toProjectTopicName(topic, this.projectId));
133132

134133
if (this.executorProvider != null) {
135134
publisherBuilder.setExecutorProvider(this.executorProvider);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2017-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.gcp.pubsub.support;
18+
19+
import com.google.pubsub.v1.ProjectTopicName;
20+
21+
import org.springframework.lang.Nullable;
22+
import org.springframework.util.Assert;
23+
24+
/**
25+
* Various utility methods for dealing with Pub/Sub topics.
26+
*
27+
* @author Mike Eltsufin
28+
* @since 1.2
29+
*/
30+
public final class PubSubTopicUtils {
31+
32+
private PubSubTopicUtils() {
33+
}
34+
35+
/**
36+
* Creates a {@link ProjectTopicName} based on a topic name within a project or the
37+
* fully-qualified topic name. If the specified topic is in the
38+
* projects/&lt;project_name&gt;/topics/&lt;topic_name&gt; format, then the {@code projectId} is
39+
* ignored}
40+
* @param topic the topic name in the project or the fully-qualified project name
41+
* @param projectId the project ID to use if the topic is not a fully-qualified name
42+
* @return the Pub/Sub object representing the topic name
43+
*/
44+
public static ProjectTopicName toProjectTopicName(String topic, @Nullable String projectId) {
45+
Assert.notNull(topic, "The topic can't be null.");
46+
47+
ProjectTopicName projectTopicName = null;
48+
49+
if (ProjectTopicName.isParsableFrom(topic)) {
50+
// Fully-qualified topic name in the "projects/<project_name>/topics/<topic_name>" format
51+
projectTopicName = ProjectTopicName.parse(topic);
52+
}
53+
else {
54+
Assert.notNull(projectId, "The project ID can't be null when using canonical topic name.");
55+
projectTopicName = ProjectTopicName.of(projectId, topic);
56+
}
57+
58+
return projectTopicName;
59+
}
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2017-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.gcp.pubsub.support;
18+
19+
import com.google.pubsub.v1.ProjectTopicName;
20+
import org.junit.Test;
21+
22+
import static org.assertj.core.api.Assertions.assertThat;
23+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
24+
25+
/**
26+
* Tests for {@link PubSubTopicUtils}.
27+
*
28+
* @author Mike Eltsufin
29+
*/
30+
public class PubSubTopicUtilsTests {
31+
32+
@Test
33+
public void testToProjectTopicName_canonical() {
34+
String project = "projectA";
35+
String topic = "topicA";
36+
String fqn = "projects/" + project + "/topics/" + topic;
37+
38+
ProjectTopicName parsedProjectTopicName = PubSubTopicUtils.toProjectTopicName(topic, project);
39+
40+
assertThat(parsedProjectTopicName).isEqualTo(ProjectTopicName.of(project, topic));
41+
assertThat(parsedProjectTopicName.toString()).isEqualTo(fqn);
42+
}
43+
44+
@Test
45+
public void testToProjectTopicName_no_topic() {
46+
assertThatThrownBy(() -> PubSubTopicUtils.toProjectTopicName(null, "topicA"))
47+
.isInstanceOf(IllegalArgumentException.class)
48+
.hasMessage("The topic can't be null.");
49+
}
50+
51+
@Test
52+
public void testToProjectTopicName_canonical_no_project() {
53+
assertThatThrownBy(() -> PubSubTopicUtils.toProjectTopicName("topicA", null))
54+
.isInstanceOf(IllegalArgumentException.class)
55+
.hasMessage("The project ID can't be null when using canonical topic name.");
56+
}
57+
58+
@Test
59+
public void testToProjectTopicName_fqn() {
60+
String project = "projectA";
61+
String topic = "topicA";
62+
String fqn = "projects/" + project + "/topics/" + topic;
63+
64+
ProjectTopicName parsedProjectTopicName = PubSubTopicUtils.toProjectTopicName(fqn, project);
65+
66+
assertThat(parsedProjectTopicName).isEqualTo(ProjectTopicName.of(project, topic));
67+
assertThat(parsedProjectTopicName.toString()).isEqualTo(fqn);
68+
}
69+
70+
@Test
71+
public void testToProjectTopicName_fqn_no_project() {
72+
String project = "projectA";
73+
String topic = "topicA";
74+
String fqn = "projects/" + project + "/topics/" + topic;
75+
76+
ProjectTopicName parsedProjectTopicName = PubSubTopicUtils.toProjectTopicName(fqn, null);
77+
78+
assertThat(parsedProjectTopicName).isEqualTo(ProjectTopicName.of(project, topic));
79+
assertThat(parsedProjectTopicName.toString()).isEqualTo(fqn);
80+
}
81+
}

0 commit comments

Comments
 (0)
This repository has been archived.