Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PubSub Iterator pullAsync: add callback to PubSubRpc.pull #1048

Merged
merged 2 commits into from
Jun 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package com.google.cloud.pubsub;

import static com.google.api.client.util.Preconditions.checkArgument;
import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE;
import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.lazyTransform;

import com.google.cloud.AsyncPage;
Expand All @@ -27,6 +27,7 @@
import com.google.cloud.Page;
import com.google.cloud.PageImpl;
import com.google.cloud.pubsub.spi.PubSubRpc;
import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture;
import com.google.cloud.pubsub.spi.v1.PublisherApi;
import com.google.cloud.pubsub.spi.v1.SubscriberApi;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -476,15 +477,24 @@ public Future<Iterator<ReceivedMessage>> pullAsync(final String subscription, in
.setMaxMessages(maxMessages)
.setReturnImmediately(true)
.build();
Future<PullResponse> response = rpc.pull(request);
return lazyTransform(response, new Function<PullResponse, Iterator<ReceivedMessage>>() {
PullFuture future = rpc.pull(request);
future.addCallback(new PubSubRpc.PullCallback() {
@Override
public Iterator<ReceivedMessage> apply(PullResponse pullResponse) {
// Add all received messages to the automatic ack deadline renewer
List<String> ackIds = Lists.transform(pullResponse.getReceivedMessagesList(),
public void success(PullResponse response) {
List<String> ackIds = Lists.transform(response.getReceivedMessagesList(),
MESSAGE_TO_ACK_ID_FUNCTION);
ackDeadlineRenewer.add(subscription, ackIds);
return Iterators.transform(pullResponse.getReceivedMessagesList().iterator(),
}

@Override
public void failure(Throwable error) {
// ignore
}
});
return lazyTransform(future, new Function<PullResponse, Iterator<ReceivedMessage>>() {
@Override
public Iterator<ReceivedMessage> apply(PullResponse response) {
return Iterators.transform(response.getReceivedMessagesList().iterator(),
new Function<com.google.pubsub.v1.ReceivedMessage, ReceivedMessage>() {
@Override
public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.google.cloud.pubsub.spi.v1.SubscriberSettings;
import com.google.common.base.Function;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Empty;
Expand Down Expand Up @@ -89,6 +91,30 @@ protected ExecutorFactory executorFactory() {
}
}

private static final class PullFutureImpl
extends ForwardingListenableFuture.SimpleForwardingListenableFuture<PullResponse>
implements PullFuture {

PullFutureImpl(ListenableFuture<PullResponse> delegate) {
super(delegate);
}

@Override
public void addCallback(final PullCallback callback) {
Futures.addCallback(delegate(), new FutureCallback<PullResponse>() {
@Override
public void onSuccess(PullResponse result) {
callback.success(result);
}

@Override
public void onFailure(Throwable error) {
callback.failure(error);
}
});
}
}

public DefaultPubSubRpc(PubSubOptions options) throws IOException {
executorFactory = new InternalPubSubOptions(options).executorFactory();
executor = executorFactory.get();
Expand Down Expand Up @@ -136,13 +162,13 @@ private static ApiCallSettings.Builder apiCallSettings(PubSubOptions options) {
return ApiCallSettings.newBuilder().setRetrySettingsBuilder(builder);
}

private static <V> Future<V> translate(ListenableFuture<V> from, final boolean idempotent,
int... returnNullOn) {
private static <V> ListenableFuture<V> translate(ListenableFuture<V> from,
final boolean idempotent, int... returnNullOn) {
final Set<Integer> returnNullOnSet = Sets.newHashSetWithExpectedSize(returnNullOn.length);
for (int value : returnNullOn) {
returnNullOnSet.add(value);
}
return Futures.catching(from, ApiException.class, new Function<ApiException, V>() {
return Futures.catching(from, ApiException.class, new Function<ApiException, V>() {
@Override
public V apply(ApiException exception) {
if (returnNullOnSet.contains(exception.getStatusCode().value())) {
Expand Down Expand Up @@ -224,8 +250,8 @@ public Future<Empty> acknowledge(AcknowledgeRequest request) {
}

@Override
public Future<PullResponse> pull(PullRequest request) {
return translate(subscriberApi.pullCallable().futureCall(request), false);
public PullFuture pull(PullRequest request) {
return new PullFutureImpl(translate(subscriberApi.pullCallable().futureCall(request), false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,44 @@

public interface PubSubRpc extends AutoCloseable {

/**
* A callback that can be registered to {@link PullFuture} objects. Objects of this class allow
* to asynchronously react to the success or failure of a pull RPC.
*/
interface PullCallback {

/**
* This method is invoked with the result of a {@link PullFuture} when it was successful.
*
* @param response the pull response
*/
void success(PullResponse response);

/**
* This method is invoked when the {@link PullFuture} failed or was cancelled.
*
* @param error the execption that caused the {@link PullFuture} to fail
*/
void failure(Throwable error);
}

/**
* A {@link Future} implementation for pull RPCs. This class also allows users to register
* callbacks via {@link #addCallback(PullCallback)}.
*/
interface PullFuture extends Future<PullResponse> {

/**
* Registers a callback to be run on the given executor. The listener will run when the pull
* future completed its computation or, if the computation is already complete, immediately.
* There is no guaranteed ordering of execution of callbacks.
*
* <p>Registered callbacks are run using the same thread that run the RPC call. Only lightweight
* callbacks should be registered via this method.
*/
void addCallback(final PullCallback callback);
}

// in all cases root cause of ExecutionException is PubSubException
Future<Topic> create(Topic topic);

Expand All @@ -66,7 +104,7 @@ public interface PubSubRpc extends AutoCloseable {

Future<Empty> acknowledge(AcknowledgeRequest request);

Future<PullResponse> pull(PullRequest request);
PullFuture pull(PullRequest request);

Future<Empty> modify(ModifyPushConfigRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import com.google.api.client.util.Lists;
import com.google.cloud.AsyncPage;
import com.google.cloud.Page;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import org.junit.Ignore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.cloud.AsyncPage;
import com.google.cloud.Page;
import com.google.cloud.RetryParams;
import com.google.cloud.pubsub.PubSub.ListOption;
import com.google.cloud.pubsub.spi.PubSubRpc;
import com.google.cloud.pubsub.spi.PubSubRpc.PullCallback;
import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture;
import com.google.cloud.pubsub.spi.PubSubRpcFactory;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
Expand All @@ -55,13 +58,15 @@
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;

import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -1229,7 +1234,7 @@ public void testListTopicSubscriptionsAsyncWithOptions()
}

@Test
public void testPullMessages() {
public void testPullMessages() throws ExecutionException, InterruptedException {
pubsub = new PubSubImpl(options, renewerMock);
PullRequest request = PullRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
Expand All @@ -1243,10 +1248,16 @@ public void testPullMessages() {
.addReceivedMessages(MESSAGE_PB1)
.addReceivedMessages(MESSAGE_PB2)
.build();
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(Futures.immediateFuture(response));
Capture<PullCallback> callback = Capture.newInstance();
PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);
futureMock.addCallback(EasyMock.capture(callback));
EasyMock.expectLastCall();
EasyMock.expect(futureMock.get()).andReturn(response);
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(futureMock);
renewerMock.add(SUBSCRIPTION, ImmutableList.of("ackId1", "ackId2"));
EasyMock.replay(pubsubRpcMock, renewerMock);
EasyMock.replay(pubsubRpcMock, renewerMock, futureMock);
Iterator<ReceivedMessage> messageIterator = pubsub.pull(SUBSCRIPTION, 42);
callback.getValue().success(response);
EasyMock.reset(renewerMock);
for (ReceivedMessage message : messageList) {
renewerMock.remove(SUBSCRIPTION, message.ackId());
Expand All @@ -1256,6 +1267,7 @@ public void testPullMessages() {
while (messageIterator.hasNext()) {
messageIterator.next();
}
EasyMock.verify(futureMock);
}

@Test
Expand All @@ -1273,10 +1285,16 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
.addReceivedMessages(MESSAGE_PB1)
.addReceivedMessages(MESSAGE_PB2)
.build();
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(Futures.immediateFuture(response));
Capture<PullCallback> callback = Capture.newInstance();
PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);
futureMock.addCallback(EasyMock.capture(callback));
EasyMock.expectLastCall();
EasyMock.expect(futureMock.get()).andReturn(response);
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(futureMock);
renewerMock.add(SUBSCRIPTION, ImmutableList.of("ackId1", "ackId2"));
EasyMock.replay(pubsubRpcMock, renewerMock);
EasyMock.replay(pubsubRpcMock, renewerMock, futureMock);
Iterator<ReceivedMessage> messageIterator = pubsub.pullAsync(SUBSCRIPTION, 42).get();
callback.getValue().success(response);
EasyMock.reset(renewerMock);
for (ReceivedMessage message : messageList) {
renewerMock.remove(SUBSCRIPTION, message.ackId());
Expand All @@ -1286,6 +1304,55 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
while (messageIterator.hasNext()) {
messageIterator.next();
}
EasyMock.verify(futureMock);
}

@Test
public void testPullMessagesError() throws ExecutionException, InterruptedException {
pubsub = new PubSubImpl(options, renewerMock);
PullRequest request = PullRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.setMaxMessages(42)
.setReturnImmediately(true)
.build();
PubSubException exception = new PubSubException(new IOException(), false);
PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);
futureMock.addCallback(EasyMock.anyObject(PullCallback.class));
EasyMock.expectLastCall();
EasyMock.expect(futureMock.get()).andThrow(new ExecutionException(exception));
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(futureMock);
EasyMock.replay(pubsubRpcMock, renewerMock, futureMock);
try {
pubsub.pull(SUBSCRIPTION, 42);
fail("Expected PubSubException");
} catch (PubSubException ex) {
assertSame(exception, ex);
}
EasyMock.verify(futureMock);
}

@Test
public void testPullMessagesAsyncError() throws ExecutionException, InterruptedException {
pubsub = new PubSubImpl(options, renewerMock);
PullRequest request = PullRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.setMaxMessages(42)
.setReturnImmediately(true)
.build();
PubSubException exception = new PubSubException(new IOException(), false);
PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);
futureMock.addCallback(EasyMock.anyObject(PullCallback.class));
EasyMock.expectLastCall();
EasyMock.expect(futureMock.get()).andThrow(new ExecutionException(exception));
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(futureMock);
EasyMock.replay(pubsubRpcMock, renewerMock, futureMock);
try {
pubsub.pullAsync(SUBSCRIPTION, 42).get();
fail("Expected ExecutionException");
} catch (ExecutionException ex) {
assertSame(exception, ex.getCause());
}
EasyMock.verify(futureMock);
}

@Test
Expand Down