Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Commit

Permalink
Fixups
Browse files Browse the repository at this point in the history
  • Loading branch information
sammcveety committed Feb 15, 2017
1 parent 20862aa commit d7a70fe
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ private static class State {
*/
@Nullable
Map<String, Long> ackDeadline;

/**
* Whether a subscription has been created.
*/
boolean createdSubscription;
}

private static final State STATE = new State();
Expand All @@ -124,12 +129,40 @@ public static PubsubTestClientFactory createFactoryForPublish(
final TopicPath expectedTopic,
final Iterable<OutgoingMessage> expectedOutgoingMessages,
final Iterable<OutgoingMessage> failingOutgoingMessages) {
return createFactoryForPublishInternal(
expectedTopic, expectedOutgoingMessages, failingOutgoingMessages, false);
}

/**
* Return a factory for testing publishers. Only one factory may be in-flight at a time.
* The factory must be closed when the test is complete, at which point final validation will
* occur. Additionally, verify that createSubscription was called.
*/
public static PubsubTestClientFactory createFactoryForPublishVerifySubscription(
final TopicPath expectedTopic,
final Iterable<OutgoingMessage> expectedOutgoingMessages,
final Iterable<OutgoingMessage> failingOutgoingMessages) {
return createFactoryForPublishInternal(
expectedTopic, expectedOutgoingMessages, failingOutgoingMessages, true);
}

/**
* Return a factory for testing publishers. Only one factory may be in-flight at a time.
* The factory must be closed when the test is complete, at which point final validation will
* occur.
*/
public static PubsubTestClientFactory createFactoryForPublishInternal(
final TopicPath expectedTopic,
final Iterable<OutgoingMessage> expectedOutgoingMessages,
final Iterable<OutgoingMessage> failingOutgoingMessages,
final boolean verifySubscriptionCreated) {
synchronized (STATE) {
checkState(!STATE.isActive, "Test still in flight");
STATE.expectedTopic = expectedTopic;
STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages);
STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages);
STATE.isActive = true;
STATE.createdSubscription = false;
}
return new PubsubTestClientFactory() {
@Override
Expand All @@ -148,6 +181,9 @@ public String getKind() {
@Override
public void close() {
synchronized (STATE) {
if (verifySubscriptionCreated) {
checkState(STATE.createdSubscription, "Did not call create subscription");
}
checkState(STATE.isActive, "No test still in flight");
checkState(STATE.remainingExpectedOutgoingMessages.isEmpty(),
"Still waiting for %s messages to be published",
Expand Down Expand Up @@ -372,6 +408,9 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
synchronized (STATE) {
STATE.createdSubscription = true;
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ public void readManyMessages() throws IOException {

@Test
public void testNullSubscription() throws Exception {
factory = PubsubTestClient.createFactoryForPublish(
factory = PubsubTestClient.createFactoryForPublishVerifySubscription(
TOPIC, ImmutableList.<OutgoingMessage>of(), ImmutableList.<OutgoingMessage>of());
TestPipeline p = TestPipeline.create();
p.apply(new PubsubUnboundedSource<>(
Expand Down

0 comments on commit d7a70fe

Please sign in to comment.