diff --git a/gcloud-java-core/pom.xml b/gcloud-java-core/pom.xml index 22648d2e6ffa..a2f9b40ad71e 100644 --- a/gcloud-java-core/pom.xml +++ b/gcloud-java-core/pom.xml @@ -98,5 +98,15 @@ 3.4 test + + com.google.protobuf + protobuf-java + 3.0.0-beta-1 + + + com.google.api + gax + 0.0.11 + diff --git a/gcloud-java-core/src/main/java/com/google/cloud/AsyncPage.java b/gcloud-java-core/src/main/java/com/google/cloud/AsyncPage.java new file mode 100644 index 000000000000..ea2905795e62 --- /dev/null +++ b/gcloud-java-core/src/main/java/com/google/cloud/AsyncPage.java @@ -0,0 +1,23 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud; + +import java.util.concurrent.Future; + +public interface AsyncPage extends Page { + Future> nextPageAsync(); +} diff --git a/gcloud-java-core/src/main/java/com/google/cloud/BaseServiceException.java b/gcloud-java-core/src/main/java/com/google/cloud/BaseServiceException.java index 6dc87f4abb3e..a2a1486f8dce 100644 --- a/gcloud-java-core/src/main/java/com/google/cloud/BaseServiceException.java +++ b/gcloud-java-core/src/main/java/com/google/cloud/BaseServiceException.java @@ -18,6 +18,7 @@ import com.google.api.client.googleapis.json.GoogleJsonError; import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.gax.grpc.ApiException; import com.google.common.base.MoreObjects; import java.io.IOException; @@ -143,6 +144,16 @@ public BaseServiceException(int code, String message, String reason, boolean ide this.debugInfo = null; } + public BaseServiceException(ApiException apiException, boolean idempotent) { + super(apiException.getMessage(), apiException); + this.code = apiException.getStatusCode().value(); + this.reason = apiException.getStatusCode().name(); + this.idempotent = idempotent; + this.retryable = apiException.isRetryable(); + this.location = null; + this.debugInfo = null; + } + protected Set retryableErrors() { return Collections.emptySet(); } diff --git a/gcloud-java-core/src/main/java/com/google/cloud/ByteArray.java b/gcloud-java-core/src/main/java/com/google/cloud/ByteArray.java new file mode 100644 index 000000000000..f23fb0876b41 --- /dev/null +++ b/gcloud-java-core/src/main/java/com/google/cloud/ByteArray.java @@ -0,0 +1,165 @@ +/* + * Copyright 2015 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud; + +import com.google.common.base.MoreObjects; +import com.google.common.base.MoreObjects.ToStringHelper; +import com.google.protobuf.ByteString; + +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; + +/** + * An immutable byte array holder. + */ +public class ByteArray implements Iterable, Serializable { + + private static final long serialVersionUID = -1908809133893782840L; + private final ByteString byteString; + + protected ByteArray(ByteString byteString) { + this.byteString = byteString; + } + + protected ByteArray(ByteArray byteArray) { + this.byteString = byteArray.byteString(); + } + + @Override + public final Iterator iterator() { + return byteString.iterator(); + } + + @Override + public String toString() { + ToStringHelper toStringHelper = MoreObjects.toStringHelper(this); + StringBuilder stBuilder = new StringBuilder(); + for (int i = 0; i < Math.min(256, byteString.size()); i++) { + stBuilder.append(String.format("%02x", byteString.byteAt(i))); + } + if (byteString.size() > 256) { + stBuilder.append("..."); + } + return toStringHelper.add("bytes", stBuilder.toString()).toString(); + } + + @Override + public final int hashCode() { + return byteString.hashCode(); + } + + @Override + public final boolean equals(Object obj) { + return obj == this + || obj instanceof ByteArray && byteString.equals(((ByteArray) obj).byteString); + } + + /** + * Returns the size of this blob. + */ + public final int length() { + return byteString.size(); + } + + /** + * Returns a copy as byte array. + */ + public final byte[] toByteArray() { + return byteString.toByteArray(); + } + + /** + * Returns the content as {@code UTF-8} string. + */ + public final String toStringUtf8() { + return byteString.toStringUtf8(); + } + + /** + * Returns a read-only {@link ByteBuffer} for this blob content. + */ + public final ByteBuffer asReadOnlyByteBuffer() { + return byteString.asReadOnlyByteBuffer(); + } + + /** + * Returns an {@link InputStream} for this blob content. + */ + public final InputStream asInputStream() { + final ByteBuffer byteBuffer = asReadOnlyByteBuffer(); + return new InputStream() { + @Override public int read() { + return !byteBuffer.hasRemaining() ? -1 : byteBuffer.get() & 0xFF; + } + }; + } + + protected ByteString byteString() { + return byteString; + } + + /** + * Copies bytes into a ByteBuffer. + * + * @throws java.nio.ReadOnlyBufferException if the target is read-only + * @throws java.nio.BufferOverflowException if the target's remaining() space is not large + * enough to hold the data + */ + public final void copyTo(ByteBuffer target) { + byteString.copyTo(target); + } + + /** + * Copies bytes into a buffer. + * + * @throws IndexOutOfBoundsException if an offset or size is negative or too large + */ + public final void copyTo(byte[] target) { + byteString.copyTo(target, 0, 0, length()); + } + + public static final ByteArray copyFrom(byte[] bytes) { + return new ByteArray(ByteString.copyFrom(bytes)); + } + + /** + * Copy the bytes using {@code UTF-8} decoding. + */ + public static final ByteArray copyFrom(String string) { + return new ByteArray(ByteString.copyFrom(string, StandardCharsets.UTF_8)); + } + + public static final ByteArray copyFrom(ByteBuffer bytes) { + return new ByteArray(ByteString.copyFrom(bytes)); + } + + public static final ByteArray copyFrom(InputStream input) throws IOException { + BufferedInputStream bufferedInput = new BufferedInputStream(input); + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + int value; + while ((value = bufferedInput.read()) != -1) { + bytes.write(value); + } + return copyFrom(bytes.toByteArray()); + } +} diff --git a/gcloud-java-pubsub/pom.xml b/gcloud-java-pubsub/pom.xml index 4a7485450433..8724ed627e1c 100644 --- a/gcloud-java-pubsub/pom.xml +++ b/gcloud-java-pubsub/pom.xml @@ -4,6 +4,7 @@ gcloud-java-pubsub jar GCloud Java Pub/Sub + https://github.com/GoogleCloudPlatform/gcloud-java/tree/master/gcloud-java-pubsub Java idiomatic client for Google Cloud Pub/Sub. @@ -16,6 +17,11 @@ gcloud-java-pubsub + + ${project.groupId} + gcloud-java-core + ${project.version} + com.google.api gax diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Message.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Message.java new file mode 100644 index 000000000000..830f9115a41d --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Message.java @@ -0,0 +1,252 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.pubsub; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.cloud.ByteArray; +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import com.google.pubsub.v1.PubsubMessage; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Pub/Sub message. + */ +public class Message implements Serializable { + + private static final long serialVersionUID = -1436515787233340634L; + private static final long NANOS_PER_MILLISECOND = 1000000; + private static final long MILLIS_PER_SECOND = 1000; + + private final String id; + private final InternalByteArray payload; + private final ImmutableMap attributes; + private final Long publishTime; + + private static final class InternalByteArray extends ByteArray { + + private static final long serialVersionUID = -3330181485911805428L; + + protected InternalByteArray(ByteString byteString) { + super(byteString); + } + + protected InternalByteArray(ByteArray byteArray) { + super(byteArray); + } + + @Override + protected ByteString byteString() { + return super.byteString(); + } + } + + /** + * Builder for Message. + */ + public abstract static class Builder { + + abstract Builder id(String id); + + public abstract Builder payload(String payload); + + public abstract Builder payload(ByteArray payload); + + public abstract Builder attributes(Map attributes); + + public abstract Builder addAttribute(String name, String value); + + public abstract Builder removeAttribute(String name); + + public abstract Builder clearAttributes(); + + abstract Builder publishTime(long publishTime); + + public abstract Message build(); + } + + static final class BuilderImpl extends Builder { + + private String id = ""; + private ByteArray payload; + private Map attributes = new HashMap<>(); + private Long publishTime; + + private BuilderImpl() {} + + BuilderImpl(Message message) { + id = message.id; + payload = message.payload; + attributes = new HashMap<>(message.attributes); + publishTime = message.publishTime; + } + + @Override + BuilderImpl id(String id) { + this.id = checkNotNull(id); + return this; + } + + @Override + public Builder payload(String payload) { + return payload(ByteArray.copyFrom(payload)); + } + + @Override + public Builder payload(ByteArray payload) { + this.payload = payload; + return this; + } + + @Override + public Builder addAttribute(String name, String value) { + attributes.put(name, value); + return this; + } + + @Override + public Builder attributes(Map attributes) { + this.attributes = new HashMap<>(attributes); + return this; + } + + @Override + public Builder removeAttribute(String name) { + attributes.remove(name); + return this; + } + + @Override + public Builder clearAttributes() { + attributes.clear(); + return this; + } + + @Override + Builder publishTime(long publishTime) { + this.publishTime = publishTime; + return this; + } + + @Override + public Message build() { + return new Message(this); + } + } + + Message(BuilderImpl builder) { + id = builder.id; + payload = new InternalByteArray(checkNotNull(builder.payload)); + attributes = ImmutableMap.copyOf(builder.attributes); + publishTime = builder.publishTime; + } + + public Long publishTime() { + return publishTime; + } + + public Map attributes() { + return attributes; + } + + public String id() { + return id; + } + + public String payloadAsString() { + return payload.toStringUtf8(); + } + + public ByteArray payload() { + return payload; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + return Objects.equals(toPb(), ((Message) o).toPb()); + } + + @Override + public int hashCode() { + return Objects.hash(serialVersionUID, id, payload, attributes, publishTime); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("id", id) + .add("payload", payload) + .add("attributes", attributes) + .add("publishTime", publishTime) + .toString(); + } + + PubsubMessage toPb() { + PubsubMessage.Builder builder = PubsubMessage.newBuilder(); + if (id != null) { + builder.setMessageId(id); + } + builder.setData(payload.byteString()); + builder.getAttributes().putAll(attributes); + Timestamp.Builder tsBuilder = Timestamp.newBuilder(); + tsBuilder.setSeconds(publishTime / MILLIS_PER_SECOND); + tsBuilder.setNanos((int) (publishTime % MILLIS_PER_SECOND * NANOS_PER_MILLISECOND)); + builder.setPublishTime(tsBuilder); + return builder.build(); + } + + static Message fromPb(PubsubMessage messagePb) { + Builder builder = builder(new InternalByteArray(messagePb.getData())); + if (messagePb.hasPublishTime()) { + Timestamp ts = messagePb.getPublishTime(); + builder.publishTime( + ts.getSeconds() * MILLIS_PER_SECOND + ts.getNanos() / NANOS_PER_MILLISECOND); + } + builder.id(messagePb.getMessageId()); + for (Map.Entry entry : messagePb.getAttributes().entrySet()) { + builder.addAttribute(entry.getKey(), entry.getValue()); + } + return builder.build(); + } + + public Builder toBuilder() { + return new BuilderImpl(this); + } + + public static Message of(String payload) { + return builder(payload).build(); + } + + public static Builder builder(String payload) { + return new BuilderImpl().payload(payload); + } + + public static Builder builder(ByteArray payload) { + return new BuilderImpl().payload(payload); + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java new file mode 100644 index 000000000000..2695cd0064e8 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -0,0 +1,237 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import com.google.cloud.AsyncPage; +import com.google.cloud.Page; +import com.google.cloud.Service; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * An interface for Google Cloud Pub/Sub. + * + * @see Google Cloud Pub/Sub + */ +public interface PubSub extends Service { + + final class ListOption implements Serializable { + + private static final long serialVersionUID = 6517442127283383124L; + + private final Option option; + private final Object value; + + enum Option { + PAGE_SIZE, PAGE_TOKEN + } + + private ListOption(Option option, Object value) { + this.option = option; + this.value = value; + } + + Option option() { + return option; + } + + Object value() { + return value; + } + + public static ListOption pageSize(int pageSize) { + return new ListOption(Option.PAGE_SIZE, pageSize); + } + + public static ListOption pageToken(String pageToken) { + return new ListOption(Option.PAGE_TOKEN, pageToken); + } + } + + final class PullOption implements Serializable { + + private static final long serialVersionUID = -5220474819637439937L; + + private final Option option; + private final Object value; + + enum Option { + MAX_MESSAGES + } + + private PullOption(Option option, Object value) { + this.option = option; + this.value = value; + } + + Option option() { + return option; + } + + Object value() { + return value; + } + + public static PullOption maxMessages(int maxMessages) { + return new PullOption(Option.MAX_MESSAGES, maxMessages); + } + } + + /** + * A callback to process pulled messages. + * The message will be ack'ed upon successful return or nack'ed if exception is thrown. + */ + interface MessageProcessor { + void process(Message message) throws Exception; + } + + /** + * An interface to control message consumer settings. + */ + interface MessageConsumer extends AutoCloseable { + + final class PullOption implements Serializable { + + private static final long serialVersionUID = 4792164134340316582L; + + private final Option option; + private final Object value; + + enum Option { + MAX_CONCURRENT_CALLBACKS + } + + private PullOption(Option option, Object value) { + this.option = option; + this.value = value; + } + + Option option() { + return option; + } + + Object value() { + return value; + } + + public static PullOption maxConcurrentCallbacks(int maxConcurrency) { + return new PullOption(Option.MAX_CONCURRENT_CALLBACKS, maxConcurrency); + } + } + + void start(MessageConsumer.PullOption... options); + + void stop(); + } + + Topic create(TopicInfo topic); + + Future createAsync(TopicInfo topic); + + // null if not found + Topic getTopic(String topic); + + Future getTopicAsync(String topic); + + // false if not found + boolean deleteTopic(String topic); + + Future deleteTopicAsync(String topic); + + Page listTopics(ListOption... options); + + Future> listTopicsAsync(ListOption... options); + + String publish(String topic, Message message); + + Future publishAsync(String topic, Message message); + + List publish(String topic, Message message, Message... messages); + + Future> publishAsync(String topic, Message message, Message... messages); + + List publish(String topic, Iterable messages); + + Future> publishAsync(String topic, Iterable messages); + + Subscription create(SubscriptionInfo subscription); + + Future createAsync(SubscriptionInfo subscription); + + // null if not found + Subscription getSubscription(String subscription); + + Future getSubscriptionAsync(String subscription); + + void replacePushConfig(String subscription, PushConfig pushConfig); + + Future replacePushConfigAsync(String subscription, PushConfig pushConfig); + + // false if not found + boolean deleteSubscription(String subscription); + + Future deleteSubscriptionAsync(String subscription); + + Page listSubscriptions(ListOption... options); + + Future> listSubscriptionsAsync(ListOption... options); + + Page listSubscriptions(String topic, ListOption... options); + + Future> listSubscriptionsAsync(String topic, ListOption... options); + + Iterator pull(String subscription, PullOption... options); + + Future> pullAsync(String subscription, PullOption... options); + + MessageConsumer pullAsync(String subscription, MessageProcessor callback); + + void ack(String subscription, String ackId, String... ackIds); + + Future ackAsync(String subscription, String ackId, String... ackIds); + + void ack(String subscription, Iterable ackIds); + + Future ackAsync(String subscription, Iterable ackIds); + + void nack(String subscription, String ackId, String... ackIds); + + Future nackAsync(String subscription, String ackId, String... ackIds); + + void nack(String subscription, Iterable ackIds); + + Future nackAsync(String subscription, Iterable ackIds); + + void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, String ackId, + String... ackIds); + + Future modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit, + String ackId, String... ackIds); + + void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, Iterable ackIds); + + Future modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit, + Iterable ackIds); + + // IAM Policy operations: getPolicy, replacePolicy, testPermissions + // Not sure if ready (docs is not up-to-date) + // Looks like policy is per resource (topic or subscription) but not per service? +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubException.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubException.java new file mode 100644 index 000000000000..0ff6fa7e56c3 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubException.java @@ -0,0 +1,46 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import com.google.api.gax.grpc.ApiException; +import com.google.cloud.BaseServiceException; + +import java.io.IOException; +import java.util.Set; + +/** + * Pub/Sub service exception. + * + * @see Google Cloud Pub/Sub error codes + */ +public final class PubSubException extends BaseServiceException { + + private static final long serialVersionUID = 6434989638600001226L; + + public PubSubException(IOException ex, boolean idempotent) { + super(ex, idempotent); + } + + public PubSubException(ApiException apiException, boolean idempotent) { + super(apiException, idempotent); + } + + @Override + protected Set retryableErrors() { + return null; + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubFactory.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubFactory.java new file mode 100644 index 000000000000..8aa073f3a10a --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubFactory.java @@ -0,0 +1,25 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import com.google.cloud.ServiceFactory; + +/** + * An interface for Pub/Sub factories. + */ +public interface PubSubFactory + extends ServiceFactory {} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java new file mode 100644 index 000000000000..bc9e428b6ab1 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -0,0 +1,275 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static com.google.common.util.concurrent.Futures.lazyTransform; + +import com.google.cloud.AsyncPage; +import com.google.cloud.BaseService; +import com.google.cloud.Page; +import com.google.cloud.pubsub.spi.PubSubRpc; +import com.google.common.base.Function; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.Uninterruptibles; +import com.google.protobuf.Empty; +import com.google.pubsub.v1.DeleteTopicRequest; +import com.google.pubsub.v1.GetTopicRequest; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +class PubSubImpl extends BaseService implements PubSub { + + private final PubSubRpc rpc; + + PubSubImpl(PubSubOptions options) { + super(options); + rpc = options.rpc(); + } + + private static V get(Future future) { + try { + return Uninterruptibles.getUninterruptibly(future); + } catch (ExecutionException ex) { + throw Throwables.propagate(ex.getCause()); + } + } + + @Override + public Topic create(TopicInfo topic) { + return get(createAsync(topic)); + } + + @Override + public Future createAsync(TopicInfo topic) { + return lazyTransform(rpc.create(topic.toPb()), Topic.fromPbFunction(this)); + } + + @Override + public Topic getTopic(String topic) { + return get(getTopicAsync(topic)); + } + + @Override + public Future getTopicAsync(String topic) { + GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(topic).build(); + return lazyTransform(rpc.get(request), Topic.fromPbFunction(this)); + } + + @Override + public boolean deleteTopic(String topic) { + return get(deleteTopicAsync(topic)); + } + + @Override + public Future deleteTopicAsync(String topic) { + DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(topic).build(); + return lazyTransform(rpc.delete(request), new Function() { + @Override + public Boolean apply(Empty input) { + return true; + } + }); + } + + @Override + public Page listTopics(ListOption... options) { + return null; + } + + @Override + public Future> listTopicsAsync(ListOption... options) { + return null; + } + + @Override + public String publish(String topic, Message message) { + return null; + } + + @Override + public Future publishAsync(String topic, Message message) { + return null; + } + + @Override + public List publish(String topic, Message message, Message... messages) { + return null; + } + + @Override + public Future> publishAsync(String topic, Message message, Message... messages) { + return null; + } + + @Override + public List publish(String topic, Iterable messages) { + return null; + } + + @Override + public Future> publishAsync(String topic, Iterable messages) { + return null; + } + + @Override + public Subscription create(SubscriptionInfo subscription) { + return null; + } + + @Override + public Future createAsync(SubscriptionInfo subscription) { + return null; + } + + @Override + public Subscription getSubscription(String subscription) { + return null; + } + + @Override + public Future getSubscriptionAsync(String subscription) { + return null; + } + + @Override + public void replacePushConfig(String subscription, PushConfig pushConfig) { + + } + + @Override + public Future replacePushConfigAsync(String subscription, PushConfig pushConfig) { + return null; + } + + @Override + public boolean deleteSubscription(String subscription) { + return false; + } + + @Override + public Future deleteSubscriptionAsync(String subscription) { + return null; + } + + @Override + public Page listSubscriptions(ListOption... options) { + return null; + } + + @Override + public Future> listSubscriptionsAsync(ListOption... options) { + return null; + } + + @Override + public Page listSubscriptions(String topic, ListOption... options) { + return null; + } + + @Override + public Future> listSubscriptionsAsync(String topic, + ListOption... options) { + return null; + } + + @Override + public Iterator pull(String subscription, PullOption... options) { + // this should set return_immediately to true + return null; + } + + @Override + public Future> pullAsync(String subscription, PullOption... options) { + // though this method can set return_immediately to false (as future can be canceled) I + // suggest to keep it false so sync could delegate to asyc and use the same options + // this method also should use the VTKIT thread-pool to renew ack deadline for non consumed + // messages + return null; + } + + @Override + public MessageConsumer pullAsync(String subscription, MessageProcessor callback) { + // this method should use the VTKIT thread-pool (maybe getting it should be part of the spi) + return null; + } + + @Override + public void ack(String subscription, String ackId, String... ackIds) { + } + + @Override + public Future ackAsync(String subscription, String ackId, String... ackIds) { + return null; + } + + @Override + public void ack(String subscription, Iterable ackIds) { + + } + + @Override + public Future ackAsync(String subscription, Iterable ackIds) { + return null; + } + + @Override + public void nack(String subscription, String ackId, String... ackIds) { + } + + @Override + public Future nackAsync(String subscription, String ackId, String... ackIds) { + return null; + } + + @Override + public void nack(String subscription, Iterable ackIds) { + + } + + @Override + public Future nackAsync(String subscription, Iterable ackIds) { + return null; + } + + @Override + public void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, String ackId, + String... ackIds) { + + } + + @Override + public Future modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit, + String ackId, String... ackIds) { + return null; + } + + @Override + public void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, + Iterable ackIds) { + + } + + @Override + public Future modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit, + Iterable ackIds) { + return null; + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java new file mode 100644 index 000000000000..73482ccef25f --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java @@ -0,0 +1,121 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import com.google.cloud.ServiceOptions; +import com.google.cloud.pubsub.spi.DefaultPubSubRpc; +import com.google.cloud.pubsub.spi.PubSubRpc; +import com.google.cloud.pubsub.spi.PubSubRpcFactory; +import com.google.common.collect.ImmutableSet; + +import java.io.IOException; +import java.util.Set; + +public class PubSubOptions extends ServiceOptions { + + private static final long serialVersionUID = 6740347843343421456L; + private static final String PUBSUB_SCOPE = "https://www.googleapis.com/auth/pubsub"; + private static final Set SCOPES = ImmutableSet.of(PUBSUB_SCOPE); + private static final String DEFAULT_HOST = "https://pubsub.googleapis.com"; + + public static class DefaultPubSubFactory implements PubSubFactory { + private static final PubSubFactory INSTANCE = new DefaultPubSubFactory(); + + @Override + public PubSub create(PubSubOptions options) { + return new PubSubImpl(options); + } + } + + /** + * Returns a default {@code PubSubOptions} instance. + */ + public static PubSubOptions defaultInstance() { + return builder().build(); + } + + public static class DefaultPubSubRpcFactory implements PubSubRpcFactory { + private static final PubSubRpcFactory INSTANCE = new DefaultPubSubRpcFactory(); + + @Override + public PubSubRpc create(PubSubOptions options) { + try { + return new DefaultPubSubRpc(options); + } catch (IOException e) { + throw new PubSubException(e, true); + } + } + } + + @Override + protected String defaultHost() { + return DEFAULT_HOST; + } + + public static class Builder extends + ServiceOptions.Builder { + + private Builder() {} + + private Builder(PubSubOptions options) { + super(options); + } + + @Override + public PubSubOptions build() { + return new PubSubOptions(this); + } + } + + private PubSubOptions(Builder builder) { + super(PubSubFactory.class, PubSubRpcFactory.class, builder); + } + + @Override + protected PubSubFactory defaultServiceFactory() { + return DefaultPubSubFactory.INSTANCE; + } + + @Override + protected PubSubRpcFactory defaultRpcFactory() { + return DefaultPubSubRpcFactory.INSTANCE; + } + + @Override + protected Set scopes() { + return SCOPES; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof PubSubOptions && baseEquals((PubSubOptions) obj); + } + + @Override + public int hashCode() { + return baseHashCode(); + } + + @Override + public Builder toBuilder() { + return new Builder(this); + } + + public static Builder builder() { + return new Builder(); + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PushConfig.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PushConfig.java new file mode 100644 index 000000000000..61b64a07b36b --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PushConfig.java @@ -0,0 +1,142 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * PubSub subscription push configuration. + */ +public final class PushConfig implements Serializable { + + private static final long serialVersionUID = 4408885787064092231L; + + private final String endpoint; + private final ImmutableMap attributes; + + public static final class Builder { + + private String endpoint; + private final Map attributes = new HashMap<>(); + + private Builder() { + } + + public Builder endPoint(String endpoint) { + this.endpoint = checkNotNull(endpoint); + return this; + } + + public Builder addAttribute(String name, String value) { + attributes.put(name, value); + return this; + } + + public Builder removeAttribute(String name) { + attributes.remove(name); + return this; + } + + public Builder clearAttributes() { + attributes.clear(); + return this; + } + + public PushConfig build() { + return new PushConfig(this); + } + } + + private PushConfig(Builder builder) { + endpoint = builder.endpoint; + attributes = ImmutableMap.copyOf(builder.attributes); + } + + public String endpoint() { + return endpoint; + } + + public Map attributes() { + return attributes; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PushConfig that = (PushConfig) o; + return Objects.equals(endpoint, that.endpoint) && Objects.equals(attributes, that.attributes); + } + + @Override + public int hashCode() { + return Objects.hash(endpoint, attributes); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("attributes", attributes) + .add("endpoint", endpoint) + .toString(); + } + + public Builder toBuilder() { + return builder(endpoint, attributes); + } + + public static PushConfig of(String endpoint) { + return builder(endpoint).build(); + } + + public static PushConfig of(String endpoint, Map attributes) { + return builder(endpoint, attributes).build(); + } + + public static Builder builder(String endPoint) { + return new Builder().endPoint(endPoint); + } + + public static Builder builder(String endpoint, Map attributes) { + Builder builder = builder(endpoint); + for (Map.Entry entry : attributes.entrySet()) { + builder.addAttribute(entry.getKey(), entry.getValue()); + } + return builder; + } + + com.google.pubsub.v1.PushConfig toPb() { + return com.google.pubsub.v1.PushConfig.newBuilder().setPushEndpoint(endpoint) + .putAllAttributes(attributes).build(); + } + + static PushConfig fromPb(com.google.pubsub.v1.PushConfig pushConfigPb) { + return builder(pushConfigPb.getPushEndpoint(), pushConfigPb.getAttributes()).build(); + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/ReceivedMessage.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/ReceivedMessage.java new file mode 100644 index 000000000000..c3cb705cb6d7 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/ReceivedMessage.java @@ -0,0 +1,184 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.cloud.ByteArray; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public class ReceivedMessage extends Message { + + private static final long serialVersionUID = -4178477763916251733L; + + private final String subscription; + private final String ackId; + private transient PubSub pubsub; + private final PubSubOptions options; + + public static final class Builder extends Message.Builder { + + private final String subscription; + private final String ackId; + private final PubSub pubsub; + private final BuilderImpl delegate; + + private Builder(String subscription, String ackId, PubSub pubsub, BuilderImpl delegate) { + this.subscription = subscription; + this.ackId = ackId; + this.pubsub = pubsub; + this.delegate = delegate; + } + + @Override + Builder id(String id) { + delegate.id(id); + return this; + } + + @Override + public Builder payload(String payload) { + delegate.payload(payload); + return this; + } + + @Override + public Builder payload(ByteArray payload) { + delegate.payload(payload); + return this; + } + + @Override + public Builder attributes(Map attributes) { + delegate.attributes(attributes); + return this; + } + + @Override + public Builder addAttribute(String name, String value) { + delegate.addAttribute(name, value); + return this; + } + + @Override + public Builder removeAttribute(String name) { + delegate.removeAttribute(name); + return this; + } + + @Override + public Builder clearAttributes() { + delegate.clearAttributes(); + return this; + } + + @Override + Builder publishTime(long publishTime) { + delegate.publishTime(publishTime); + return this; + } + + @Override + public ReceivedMessage build() { + return new ReceivedMessage(this); + } + } + + ReceivedMessage(Builder builder) { + super(builder.delegate); + subscription = checkNotNull(builder.subscription); + ackId = checkNotNull(builder.ackId); + pubsub = checkNotNull(builder.pubsub); + options = pubsub.options(); + } + + @Override + public Builder toBuilder() { + return new Builder(subscription, ackId, pubsub, new BuilderImpl(this)); + } + + @Override + public int hashCode() { + return Objects.hash(options, super.hashCode()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + ReceivedMessage other = (ReceivedMessage) obj; + return Objects.equals(toPb(), other.toPb()) && Objects.equals(options, other.options); + } + + public PubSub pubSub() { + return pubsub; + } + + public String subscription() { + return subscription; + } + + public String ackId() { + return ackId; + } + + public void ack() { + pubsub.ack(subscription, ackId); + } + + public Future ackAsync() { + return pubsub.ackAsync(subscription, ackId); + } + + public void nack() { + pubsub.nack(subscription, ackId); + } + + public Future nackAsync() { + return pubsub.nackAsync(subscription, ackId); + } + + public void modifyAckDeadline(int deadline, TimeUnit unit) { + pubsub.modifyAckDeadline(subscription, deadline, unit, ackId); + } + + public Future modifyAckDeadlineAsync(int deadline, TimeUnit unit) { + return pubsub.modifyAckDeadlineAsync(subscription, deadline, unit, ackId); + } + + private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException { + input.defaultReadObject(); + this.pubsub = options.service(); + } + + static ReceivedMessage fromPb(PubSub storage, String subscription, + com.google.pubsub.v1.ReceivedMessage msgPb) { + Message message = fromPb(msgPb.getMessage()); + String ackId = msgPb.getAckId(); + return new Builder(subscription, ackId, storage, new BuilderImpl(message)).build(); + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java new file mode 100644 index 000000000000..d49328ad7486 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java @@ -0,0 +1,158 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.cloud.pubsub.PubSub.MessageConsumer; +import com.google.cloud.pubsub.PubSub.MessageProcessor; +import com.google.cloud.pubsub.PubSub.PullOption; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Iterator; +import java.util.Objects; +import java.util.concurrent.Future; + +/** + * PubSub subscription. + */ +public class Subscription extends SubscriptionInfo { + + private static final long serialVersionUID = -4153366055659552230L; + + private final PubSubOptions options; + private transient PubSub pubsub; + + public static final class Builder extends SubscriptionInfo.Builder { + + private final PubSub pubsub; + private final BuilderImpl delegate; + + private Builder(Subscription subscription) { + pubsub = subscription.pubsub; + delegate = new BuilderImpl(subscription); + } + + @Override + public Builder topic(String name) { + delegate.topic(name); + return this; + } + + @Override + public Builder name(String name) { + delegate.name(name); + return this; + } + + @Override + public Builder pushConfig(PushConfig pushConfig) { + delegate.pushConfig(pushConfig); + return this; + } + + @Override + public Builder ackDeadLineSeconds(int ackDeadLineSeconds) { + delegate.ackDeadLineSeconds(ackDeadLineSeconds); + return this; + } + + @Override + public Subscription build() { + return new Subscription(this.pubsub, this.delegate); + } + } + + Subscription(PubSub pubsub, BuilderImpl builder) { + super(builder); + this.pubsub = checkNotNull(pubsub); + options = pubsub.options(); + } + + @Override + public Builder toBuilder() { + return new Builder(this); + } + + @Override + public int hashCode() { + return Objects.hash(options, super.hashCode()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + Subscription other = (Subscription) obj; + return Objects.equals(toPb(), other.toPb()) && Objects.equals(options, other.options); + } + + public PubSub pubSub() { + return pubsub; + } + + public boolean delete() { + return pubsub.deleteSubscription(name()); + } + + public Future deleteAsync() { + return pubsub.deleteSubscriptionAsync(name()); + } + + public Subscription reload() { + return pubsub.getSubscription(name()); + } + + public Future reloadAsync() { + return pubsub.getSubscriptionAsync(name()); + } + + public void replacePushConfig(PushConfig pushConfig) { + pubsub.replacePushConfig(name(), pushConfig); + } + + public Future replacePushConfigAsync(PushConfig pushConfig) { + return pubsub.replacePushConfigAsync(name(), pushConfig); + } + + public Iterator pull(PullOption... options) { + return pubsub.pull(name(), options); + } + + public Future> pullAsync(PullOption... options) { + return pubsub.pullAsync(name(), options); + } + + public MessageConsumer pullAsync(MessageProcessor callback) { + return pubsub.pullAsync(name(), callback); + } + + private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException { + input.defaultReadObject(); + this.pubsub = options.service(); + } + + static Subscription fromPb(PubSub storage, com.google.pubsub.v1.Subscription subscriptionPb) { + SubscriptionInfo subscriptionInfo = SubscriptionInfo.fromPb(subscriptionPb); + return new Subscription(storage, new BuilderImpl(subscriptionInfo)); + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java new file mode 100644 index 000000000000..97e7b35becd9 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java @@ -0,0 +1,189 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.MoreObjects; + +import java.io.Serializable; +import java.util.Objects; + +/** + * Pub/Sub subscription information. + */ +public class SubscriptionInfo implements Serializable { + + private static final long serialVersionUID = 1860057426574127128L; + + private final String name; + private final String topic; + private final PushConfig pushConfig; + private final int ackDeadlineSeconds; + + + /** + * Builder for Subscription. + */ + public abstract static class Builder { + + public abstract Builder name(String name); + + public abstract Builder topic(String name); + + public abstract Builder pushConfig(PushConfig pushConfig); + + public abstract Builder ackDeadLineSeconds(int ackDeadLineSeconds); + + public abstract SubscriptionInfo build(); + } + + static final class BuilderImpl extends Builder { + + private String name; + private String topic; + private PushConfig pushConfig; + private int ackDeadlineSeconds; + + private BuilderImpl(String topic, String name) { + this.topic = checkNotNull(topic); + this.name = checkNotNull(name); + } + + BuilderImpl(SubscriptionInfo subscription) { + name = subscription.name; + topic = subscription.topic; + pushConfig = subscription.pushConfig; + ackDeadlineSeconds = subscription.ackDeadlineSeconds; + } + + @Override + public Builder name(String name) { + this.name = checkNotNull(name); + return this; + } + + @Override + public Builder topic(String topic) { + this.topic = checkNotNull(topic); + return this; + } + + @Override + public Builder pushConfig(PushConfig pushConfig) { + this.pushConfig = pushConfig; + return this; + } + + @Override + public Builder ackDeadLineSeconds(int ackDeadlineSeconds) { + this.ackDeadlineSeconds = ackDeadlineSeconds; + return this; + } + + @Override + public SubscriptionInfo build() { + return new SubscriptionInfo(this); + } + } + + SubscriptionInfo(BuilderImpl builder) { + topic = builder.topic; + name = builder.name; + pushConfig = builder.pushConfig; + ackDeadlineSeconds = builder.ackDeadlineSeconds; + } + + public String topic() { + return topic; + } + + public String name() { + return name; + } + + public PushConfig pushConfig() { + return pushConfig; + } + + public long ackDeadlineSeconds() { + return ackDeadlineSeconds; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + return Objects.equals(toPb(), ((SubscriptionInfo) o).toPb()); + } + + @Override + public int hashCode() { + return Objects.hash(topic, name, pushConfig, ackDeadlineSeconds); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("topic", topic) + .add("name", name) + .add("pushConfig", pushConfig) + .add("ackDeadlineSeconds", ackDeadlineSeconds) + .toString(); + } + + com.google.pubsub.v1.Subscription toPb() { + com.google.pubsub.v1.Subscription.Builder builder = + com.google.pubsub.v1.Subscription.newBuilder(); + builder.setTopic(topic); + builder.setName(name); + builder.setAckDeadlineSeconds(ackDeadlineSeconds); + if (pushConfig != null) { + builder.setPushConfig(pushConfig.toPb()); + } + return builder.build(); + } + + static SubscriptionInfo fromPb(com.google.pubsub.v1.Subscription subscription) { + Builder builder = builder(subscription.getTopic(), subscription.getName()); + builder.ackDeadLineSeconds(subscription.getAckDeadlineSeconds()); + if (subscription.hasPushConfig()) { + builder.pushConfig(PushConfig.fromPb(subscription.getPushConfig())); + } + return builder.build(); + } + + public Builder toBuilder() { + return new BuilderImpl(this); + } + + public static SubscriptionInfo of(String topic, String name) { + return builder(topic, name).build(); + } + + public static SubscriptionInfo of(String topic, String name, String endpoint) { + return builder(topic, name).pushConfig(PushConfig.of(endpoint)).build(); + } + + public static Builder builder(String topic, String name) { + return new BuilderImpl(topic, name); + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java new file mode 100644 index 000000000000..77dd31fcd876 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java @@ -0,0 +1,162 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.cloud.AsyncPage; +import com.google.cloud.Page; +import com.google.cloud.pubsub.PubSub.ListOption; +import com.google.common.base.Function; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Future; + +/** + * PubSub Topic. + */ +public class Topic extends TopicInfo { + + private static final long serialVersionUID = -2686692223763315944L; + + private final PubSubOptions options; + private transient PubSub pubsub; + + public static final class Builder extends TopicInfo.Builder { + + private final PubSub pubsub; + private final BuilderImpl delegate; + + private Builder(Topic topic) { + pubsub = topic.pubsub; + delegate = new BuilderImpl(topic); + } + + @Override + public Builder name(String name) { + delegate.name(name); + return this; + } + + @Override + public Topic build() { + return new Topic(this.pubsub, this.delegate); + } + } + + Topic(PubSub pubsub, BuilderImpl builder) { + super(builder); + this.pubsub = checkNotNull(pubsub); + options = pubsub.options(); + } + + @Override + public Builder toBuilder() { + return new Builder(this); + } + + @Override + public int hashCode() { + return Objects.hash(options, super.hashCode()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + Topic other = (Topic) obj; + return Objects.equals(toPb(), other.toPb()) && Objects.equals(options, other.options); + } + + public PubSub pubSub() { + return pubsub; + } + + public boolean delete() { + return pubsub.deleteTopic(name()); + } + + public Future deleteAsync() { + return pubsub.deleteTopicAsync(name()); + } + + public Topic reload() { + return pubsub.getTopic(name()); + } + + public Future reloadAsync() { + return pubsub.getTopicAsync(name()); + } + + public String publish(Message message) { + return pubsub.publish(name(), message); + } + + public Future publishAsync(Message message) { + return pubsub.publishAsync(name(), message); + } + + public List publish(Message message, Message... messages) { + return pubsub.publish(name(), message, messages); + } + + public Future> publishAsync(Message message, Message... messages) { + return pubsub.publishAsync(name(), message, messages); + } + + public List publish(Iterable messages) { + return pubsub.publish(name(), messages); + } + + public Future> publishAsync(Iterable messages) { + return pubsub.publishAsync(name(), messages); + } + + public Page listSubscriptions(ListOption... options) { + return pubsub.listSubscriptions(name(), options); + } + + public Future> listSubscriptionsAsync(ListOption... options) { + return pubsub.listSubscriptionsAsync(name(), options); + } + + private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException { + input.defaultReadObject(); + this.pubsub = options.service(); + } + + static Topic fromPb(PubSub storage, com.google.pubsub.v1.Topic topicPb) { + TopicInfo topicInfo = TopicInfo.fromPb(topicPb); + return new Topic(storage, new BuilderImpl(topicInfo)); + } + + static Function fromPbFunction(final PubSub pubsub) { + return new Function() { + @Override + public Topic apply(com.google.pubsub.v1.Topic topicPb) { + return fromPb(pubsub, topicPb); + } + }; + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/TopicInfo.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/TopicInfo.java new file mode 100644 index 000000000000..37f4ae289b6f --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/TopicInfo.java @@ -0,0 +1,119 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.MoreObjects; + +import java.io.Serializable; +import java.util.Objects; + +/** + * Pub/Sub Topic information. + */ +public class TopicInfo implements Serializable { + + private static final long serialVersionUID = -5907624842808725353L; + + private final String name; + + /** + * Builder for TopicInfo. + */ + public abstract static class Builder { + + public abstract Builder name(String name); + + public abstract TopicInfo build(); + } + + static final class BuilderImpl extends Builder { + + private String name; + + BuilderImpl(String name) { + this.name = checkNotNull(name); + } + + BuilderImpl(TopicInfo topicInfo) { + this.name = topicInfo.name; + } + + @Override + public Builder name(String name) { + this.name = checkNotNull(name); + return this; + } + + @Override + public TopicInfo build() { + return new TopicInfo(this); + } + } + + TopicInfo(BuilderImpl builder) { + name = builder.name; + } + + public String name() { + return name; + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + return Objects.equals(toPb(), ((TopicInfo) obj).toPb()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("name", name) + .toString(); + } + + com.google.pubsub.v1.Topic toPb() { + return com.google.pubsub.v1.Topic.newBuilder().setName(name).build(); + } + + static TopicInfo fromPb(com.google.pubsub.v1.Topic topicPb) { + return builder(topicPb.getName()).build(); + } + + public Builder toBuilder() { + return new BuilderImpl(this); + } + + public static TopicInfo of(String name) { + return builder(name).build(); + } + + public static Builder builder(String name) { + return new BuilderImpl(name); + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/package-info.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/package-info.java new file mode 100644 index 000000000000..05dc8dcbf036 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/package-info.java @@ -0,0 +1,29 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * A client to Google Cloud Pub/Sub. + * + *

Here's a simple usage example for using gcloud-pubsub: + * todo: add example + *

 {@code
+ * PubSub pubsub = PubSubOptions.defaultInstance().service();
+ * }
+ * + * @see Google Cloud Pub/Sub + */ + +package com.google.cloud.pubsub; diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java new file mode 100644 index 000000000000..ebe83841a4ac --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java @@ -0,0 +1,199 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.spi; + +import com.google.api.gax.core.ConnectionSettings; +import com.google.api.gax.core.RetrySettings; +import com.google.api.gax.grpc.ApiCallSettings; +import com.google.api.gax.grpc.ApiException; +import com.google.cloud.RetryParams; +import com.google.cloud.pubsub.PubSubException; +import com.google.cloud.pubsub.PubSubOptions; +import com.google.cloud.pubsub.spi.v1.PublisherApi; +import com.google.cloud.pubsub.spi.v1.PublisherSettings; +import com.google.cloud.pubsub.spi.v1.SubscriberApi; +import com.google.cloud.pubsub.spi.v1.SubscriberSettings; +import com.google.common.base.Function; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.Empty; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.DeleteSubscriptionRequest; +import com.google.pubsub.v1.DeleteTopicRequest; +import com.google.pubsub.v1.GetSubscriptionRequest; +import com.google.pubsub.v1.GetTopicRequest; +import com.google.pubsub.v1.ListSubscriptionsRequest; +import com.google.pubsub.v1.ListSubscriptionsResponse; +import com.google.pubsub.v1.ListTopicSubscriptionsRequest; +import com.google.pubsub.v1.ListTopicSubscriptionsResponse; +import com.google.pubsub.v1.ListTopicsRequest; +import com.google.pubsub.v1.ListTopicsResponse; +import com.google.pubsub.v1.ModifyAckDeadlineRequest; +import com.google.pubsub.v1.ModifyPushConfigRequest; +import com.google.pubsub.v1.PublishRequest; +import com.google.pubsub.v1.PublishResponse; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.Topic; + +import org.joda.time.Duration; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.Future; + +import autovalue.shaded.com.google.common.common.collect.Sets; +import io.grpc.Status.Code; + +public class DefaultPubSubRpc implements PubSubRpc { + + private final PublisherApi publisherApi; + private final SubscriberApi subscriberApi; + + public DefaultPubSubRpc(PubSubOptions options) throws IOException { + try { + // Provide (and use a common thread-pool). + // This depends on https://github.com/googleapis/gax-java/issues/73 + PublisherSettings.Builder pbuilder = PublisherSettings.defaultInstance().toBuilder(); + pbuilder.provideChannelWith(ConnectionSettings.newBuilder() + .provideCredentialsWith(options.authCredentials().credentials()).build()); + pbuilder.applyToAllApiMethods(apiCallSettings(options)); + publisherApi = PublisherApi.create(pbuilder.build()); + SubscriberSettings.Builder sBuilder = SubscriberSettings.defaultInstance().toBuilder(); + sBuilder.provideChannelWith(ConnectionSettings.newBuilder() + .provideCredentialsWith(options.authCredentials().credentials()).build()); + sBuilder.applyToAllApiMethods(apiCallSettings(options)); + subscriberApi = SubscriberApi.create(sBuilder.build()); + } catch (Exception ex) { + throw new IOException(ex); + } + } + + private static ApiCallSettings.Builder apiCallSettings(PubSubOptions options) { + // TODO: specify timeout these settings: + // retryParams.retryMaxAttempts(), retryParams.retryMinAttempts() + RetryParams retryParams = options.retryParams(); + final RetrySettings.Builder builder = RetrySettings.newBuilder() + .setTotalTimeout(Duration.millis(retryParams.totalRetryPeriodMillis())) + .setInitialRpcTimeout(Duration.millis(options.connectTimeout())) + .setRpcTimeoutMultiplier(1.5) + .setMaxRpcTimeout(Duration.millis(options.connectTimeout() + options.readTimeout())) + .setInitialRetryDelay(Duration.millis(retryParams.initialRetryDelayMillis())) + .setRetryDelayMultiplier(retryParams.retryDelayBackoffFactor()) + .setMaxRetryDelay(Duration.millis(retryParams.maxRetryDelayMillis())); + return ApiCallSettings.newBuilder().setRetrySettingsBuilder(builder); + } + + private static Future translate(ListenableFuture from, final boolean idempotent, + int... returnNullOn) { + final Set returnNullOnSet = Sets.newHashSetWithExpectedSize(returnNullOn.length); + for (int value : returnNullOn) { + returnNullOnSet.add(value); + } + return Futures.catching(from, ApiException.class, new Function() { + @Override + public V apply(ApiException exception) { + if (returnNullOnSet.contains(exception.getStatusCode().value())) { + return null; + } + throw new PubSubException(exception, idempotent); + } + }); + } + + @Override + public Future create(Topic topic) { + // TODO: it would be nice if we can get the idempotent inforamtion from the ApiCallSettings + // or from the exception + return translate(publisherApi.createTopicCallable().futureCall(topic), true); + } + + @Override + public Future publish(PublishRequest request) { + return translate(publisherApi.publishCallable().futureCall(request), false); + } + + @Override + public Future get(GetTopicRequest request) { + return translate(publisherApi.getTopicCallable().futureCall(request), true, + Code.NOT_FOUND.value()); + } + + @Override + public Future list(ListTopicsRequest request) { + // we should consider using gax PageAccessor once + // https://github.com/googleapis/gax-java/issues/74 is fixed + // Though it is a cleaner SPI without it, but PageAccessor is an interface + // and if it saves code we should not easily dismiss it. + return translate(publisherApi.listTopicsCallable().futureCall(request), true); + } + + @Override + public Future list(ListTopicSubscriptionsRequest request) { + return translate(publisherApi.listTopicSubscriptionsCallable().futureCall(request), true); + } + + @Override + public Future delete(DeleteTopicRequest request) { + // TODO: check if null is not going to work for Empty + return translate(publisherApi.deleteTopicCallable().futureCall(request), true, + Code.NOT_FOUND.value()); + } + + @Override + public Future create(Subscription subscription) { + return translate(subscriberApi.createSubscriptionCallable().futureCall(subscription), false); + } + + @Override + public Future get(GetSubscriptionRequest request) { + return translate(subscriberApi.getSubscriptionCallable().futureCall(request), true, + Code.NOT_FOUND.value()); + } + + @Override + public Future list(ListSubscriptionsRequest request) { + return translate(subscriberApi.listSubscriptionsCallable().futureCall(request), true); + } + + @Override + public Future delete(DeleteSubscriptionRequest request) { + return translate(subscriberApi.deleteSubscriptionCallable().futureCall(request), true, + Code.NOT_FOUND.value()); + } + + @Override + public Future modify(ModifyAckDeadlineRequest request) { + return translate(subscriberApi.modifyAckDeadlineCallable().futureCall(request), false); + } + + @Override + public Future acknowledge(AcknowledgeRequest request) { + return translate(subscriberApi.acknowledgeCallable().futureCall(request), false); + } + + @Override + public Future pull(PullRequest request) { + return translate(subscriberApi.pullCallable().futureCall(request), false); + } + + @Override + public Future modify(ModifyPushConfigRequest request) { + return translate(subscriberApi.modifyPushConfigCallable().futureCall(request), false); + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java new file mode 100644 index 000000000000..8474ba042234 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java @@ -0,0 +1,72 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.spi; + +import com.google.protobuf.Empty; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.DeleteSubscriptionRequest; +import com.google.pubsub.v1.DeleteTopicRequest; +import com.google.pubsub.v1.GetSubscriptionRequest; +import com.google.pubsub.v1.GetTopicRequest; +import com.google.pubsub.v1.ListSubscriptionsRequest; +import com.google.pubsub.v1.ListSubscriptionsResponse; +import com.google.pubsub.v1.ListTopicSubscriptionsRequest; +import com.google.pubsub.v1.ListTopicSubscriptionsResponse; +import com.google.pubsub.v1.ListTopicsRequest; +import com.google.pubsub.v1.ListTopicsResponse; +import com.google.pubsub.v1.ModifyAckDeadlineRequest; +import com.google.pubsub.v1.ModifyPushConfigRequest; +import com.google.pubsub.v1.PublishRequest; +import com.google.pubsub.v1.PublishResponse; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.Topic; + +import java.util.concurrent.Future; + +public interface PubSubRpc { + + // in all cases root cause of ExecutionException is PubSubException + Future create(Topic topic); + + Future publish(PublishRequest request); + + Future get(GetTopicRequest request); + + Future list(ListTopicsRequest request); + + Future list(ListTopicSubscriptionsRequest request); + + Future delete(DeleteTopicRequest request); + + Future create(Subscription subscription); + + Future get(GetSubscriptionRequest request); + + Future list(ListSubscriptionsRequest request); + + Future delete(DeleteSubscriptionRequest request); + + Future modify(ModifyAckDeadlineRequest request); + + Future acknowledge(AcknowledgeRequest request); + + Future pull(PullRequest request); + + Future modify(ModifyPushConfigRequest request); +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpcFactory.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpcFactory.java new file mode 100644 index 000000000000..d3648a68399f --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpcFactory.java @@ -0,0 +1,27 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.spi; + +import com.google.cloud.pubsub.PubSubOptions; +import com.google.cloud.spi.ServiceRpcFactory; + +/** + * An interface for Pub/Sub RPC factory. + * Implementation will be loaded via {@link java.util.ServiceLoader}. + */ +public interface PubSubRpcFactory extends ServiceRpcFactory { +}