Skip to content

Commit

Permalink
Merge pull request #27 from ktoso/tck-more-public-ktoso
Browse files Browse the repository at this point in the history
+tck add missing `public` modifiers,
  • Loading branch information
rkuhn committed Apr 24, 2014
2 parents dad6b35 + d8e5420 commit 8f38503
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ public IdentityProcessorVerification(final TestEnvironment env, long publisherSh

this.subscriberVerification = new SubscriberVerification<T>(env) {
@Override
Subscriber<T> createSubscriber(SubscriberProbe<T> probe) {
public Subscriber<T> createSubscriber(SubscriberProbe<T> probe) {
return IdentityProcessorVerification.this.createSubscriber(probe);
}

@Override
Publisher<T> createHelperPublisher(int elements) {
public Publisher<T> createHelperPublisher(int elements) {
return IdentityProcessorVerification.this.createHelperPublisher(elements);
}
};
Expand Down Expand Up @@ -156,30 +156,28 @@ public void mustStartProducingWithTheOldestStillAvailableElementForASubscriber()
// must call `onError` on all its subscribers if it encounters a non-recoverable error
@Test
public void mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError() throws Exception {
new TestSetup(env, testBufferSize) {
{
ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
env.subscribe(processor.getPublisher(), sub1);
ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
env.subscribe(processor.getPublisher(), sub2);

sub1.requestMore(1);
expectRequestMore();
final T x = sendNextTFromUpstream();
expectNextElement(sub1, x);
sub1.requestMore(1);
new TestSetup(env, testBufferSize) {{
ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
env.subscribe(processor.getPublisher(), sub1);
ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
env.subscribe(processor.getPublisher(), sub2);

// sub1 now has received and element and has 1 pending
// sub2 has not yet requested anything
sub1.requestMore(1);
expectRequestMore();
final T x = sendNextTFromUpstream();
expectNextElement(sub1, x);
sub1.requestMore(1);

Exception ex = new RuntimeException("Test exception");
sendError(ex);
sub1.expectError(ex);
sub2.expectError(ex);
// sub1 now has received and element and has 1 pending
// sub2 has not yet requested anything

env.verifyNoAsyncErrors();
}
};
Exception ex = new RuntimeException("Test exception");
sendError(ex);
sub1.expectError(ex);
sub2.expectError(ex);

env.verifyNoAsyncErrors();
}};
}

@Test
Expand Down Expand Up @@ -498,7 +496,7 @@ public void mustUnblockTheStreamIfABlockingSubscriptionHasBeenCancelled() throws

/////////////////////// TEST INFRASTRUCTURE //////////////////////

abstract class TestSetup extends ManualPublisher<T> {
public abstract class TestSetup extends ManualPublisher<T> {
private TestEnvironment.ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
private Set<T> seenTees = new HashSet<T>();

Expand Down Expand Up @@ -540,7 +538,7 @@ public T sendNextTFromUpstream() throws InterruptedException {
}
}

private class ManualSubscriberWithErrorCollection<A> extends ManualSubscriberWithSubscriptionSupport<A> {
public class ManualSubscriberWithErrorCollection<A> extends ManualSubscriberWithSubscriptionSupport<A> {
TestEnvironment.Promise<Throwable> error;

public ManualSubscriberWithErrorCollection(TestEnvironment env) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ public void mustNotCallOnCompleteOrOnErrorMoreThanOncePerSubscriber() {

/////////////////////// TEST INFRASTRUCTURE //////////////////////

interface PublisherTestRun<T> {
public interface PublisherTestRun<T> {
public void run(Publisher<T> pub) throws Throwable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ protected SubscriberVerification(TestEnvironment env) {
* In order to be meaningfully testable your Subscriber must inform the given
* `SubscriberProbe` of the respective events having been received.
*/
abstract Subscriber<T> createSubscriber(SubscriberProbe<T> probe);
public abstract Subscriber<T> createSubscriber(SubscriberProbe<T> probe);

/**
* Helper method required for generating test elements.
* It must create a Publisher for a stream with exactly the given number of elements.
* If `elements` is zero the produced stream must be infinite.
*/
abstract Publisher<T> createHelperPublisher(int elements);
public abstract Publisher<T> createHelperPublisher(int elements);

////////////////////// TEST SETUP VERIFICATION ///////////////////////////

@Test
void exerciseHappyPath() throws InterruptedException {
public void exerciseHappyPath() throws InterruptedException {
new TestSetup(env) {{
puppet().triggerRequestMore(1);

Expand Down Expand Up @@ -63,7 +63,7 @@ void exerciseHappyPath() throws InterruptedException {
// must asynchronously schedule a respective event to the subscriber
// must not call any methods on the Subscription, the Publisher or any other Publishers or Subscribers
@Test
void onSubscribeAndOnNextMustAsynchronouslyScheduleAnEvent() {
public void onSubscribeAndOnNextMustAsynchronouslyScheduleAnEvent() {
// cannot be meaningfully tested, or can it?
}

Expand All @@ -72,14 +72,14 @@ void onSubscribeAndOnNextMustAsynchronouslyScheduleAnEvent() {
// must not call any methods on the Subscription, the Publisher or any other Publishers or Subscribers
// must consider the Subscription cancelled after having received the event
@Test
void onCompleteAndOnErrorMustAsynchronouslyScheduleAnEvent() {
public void onCompleteAndOnErrorMustAsynchronouslyScheduleAnEvent() {
// cannot be meaningfully tested, or can it?
}

// A Subscriber
// must not accept an `onSubscribe` event if it already has an active Subscription
@Test
void mustNotAcceptAnOnSubscribeEventIfItAlreadyHasAnActiveSubscription() throws InterruptedException {
public void mustNotAcceptAnOnSubscribeEventIfItAlreadyHasAnActiveSubscription() throws InterruptedException {
new TestSetup(env) {{
// try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
sub().onSubscribe(
Expand All @@ -100,7 +100,7 @@ public void cancel() {
// A Subscriber
// must call Subscription::cancel during shutdown if it still has an active Subscription
@Test
void mustCallSubscriptionCancelDuringShutdownIfItStillHasAnActiveSubscription() throws InterruptedException {
public void mustCallSubscriptionCancelDuringShutdownIfItStillHasAnActiveSubscription() throws InterruptedException {
new TestSetup(env) {{
puppet().triggerShutdown();
expectCancelling();
Expand All @@ -112,14 +112,14 @@ void mustCallSubscriptionCancelDuringShutdownIfItStillHasAnActiveSubscription()
// A Subscriber
// must ensure that all calls on a Subscription take place from the same thread or provide for respective external synchronization
@Test
void mustEnsureThatAllCallsOnASubscriptionTakePlaceFromTheSameThreadOrProvideExternalSync() {
public void mustEnsureThatAllCallsOnASubscriptionTakePlaceFromTheSameThreadOrProvideExternalSync() {
// cannot be meaningfully tested, or can it?
}

// A Subscriber
// must be prepared to receive one or more `onNext` events after having called Subscription::cancel
@Test
void mustBePreparedToReceiveOneOrMoreOnNextEventsAfterHavingCalledSubscriptionCancel() throws InterruptedException {
public void mustBePreparedToReceiveOneOrMoreOnNextEventsAfterHavingCalledSubscriptionCancel() throws InterruptedException {
new TestSetup(env) {{
puppet().triggerRequestMore(1);
puppet().triggerCancel();
Expand All @@ -133,7 +133,7 @@ void mustBePreparedToReceiveOneOrMoreOnNextEventsAfterHavingCalledSubscriptionCa
// A Subscriber
// must be prepared to receive an `onComplete` event with a preceding Subscription::requestMore call
@Test
void mustBePreparedToReceiveAnOnCompleteEventWithAPrecedingSubscriptionRequestMore() throws InterruptedException {
public void mustBePreparedToReceiveAnOnCompleteEventWithAPrecedingSubscriptionRequestMore() throws InterruptedException {
new TestSetup(env) {{
puppet().triggerRequestMore(1);
sendCompletion();
Expand All @@ -146,7 +146,7 @@ void mustBePreparedToReceiveAnOnCompleteEventWithAPrecedingSubscriptionRequestMo
// A Subscriber
// must be prepared to receive an `onComplete` event without a preceding Subscription::requestMore call
@Test
void mustBePreparedToReceiveAnOnCompleteEventWithoutAPrecedingSubscriptionRequestMore() throws InterruptedException {
public void mustBePreparedToReceiveAnOnCompleteEventWithoutAPrecedingSubscriptionRequestMore() throws InterruptedException {
new TestSetup(env) {{
sendCompletion();
probe.expectCompletion();
Expand All @@ -158,7 +158,7 @@ void mustBePreparedToReceiveAnOnCompleteEventWithoutAPrecedingSubscriptionReques
// A Subscriber
// must be prepared to receive an `onError` event with a preceding Subscription::requestMore call
@Test
void mustBePreparedToReceiveAnOnErrorEventWithAPrecedingSubscriptionRequestMore() throws InterruptedException {
public void mustBePreparedToReceiveAnOnErrorEventWithAPrecedingSubscriptionRequestMore() throws InterruptedException {
new TestSetup(env) {{
puppet().triggerRequestMore(1);
Exception ex = new RuntimeException("Test exception");
Expand All @@ -172,7 +172,7 @@ void mustBePreparedToReceiveAnOnErrorEventWithAPrecedingSubscriptionRequestMore(
// A Subscriber
// must be prepared to receive an `onError` event without a preceding Subscription::requestMore call
@Test
void mustBePreparedToReceiveAnOnErrorEventWithoutAPrecedingSubscriptionRequestMore() throws InterruptedException {
public void mustBePreparedToReceiveAnOnErrorEventWithoutAPrecedingSubscriptionRequestMore() throws InterruptedException {
new TestSetup(env) {{
Exception ex = new RuntimeException("Test exception");
sendError(ex);
Expand All @@ -184,15 +184,15 @@ void mustBePreparedToReceiveAnOnErrorEventWithoutAPrecedingSubscriptionRequestMo
// A Subscriber
// must make sure that all calls on its `onXXX` methods happen-before the processing of the respective events
@Test
void mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() {
public void mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() {
// cannot be meaningfully tested, or can it?
}

/////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////

/////////////////////// TEST INFRASTRUCTURE //////////////////////

class TestSetup extends ManualPublisher<T> {
public class TestSetup extends ManualPublisher<T> {
ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
Probe probe;
T lastT = null;
Expand All @@ -205,24 +205,24 @@ public TestSetup(TestEnvironment env) throws InterruptedException {
probe.puppet.expectCompletion(env.defaultTimeoutMillis(), String.format("Subscriber %s did not `registerOnSubscribe`", sub()));
}

Subscriber<T> sub() {
public Subscriber<T> sub() {
return subscriber.get();
}

SubscriberPuppet puppet() {
public SubscriberPuppet puppet() {
return probe.puppet.value();
}

void sendNextTFromUpstream() throws InterruptedException {
public void sendNextTFromUpstream() throws InterruptedException {
sendNext(nextT());
}

T nextT() throws InterruptedException {
public T nextT() throws InterruptedException {
lastT = tees.requestNextElement();
return lastT;
}

class Probe implements SubscriberProbe<T> {
public class Probe implements SubscriberProbe<T> {
Promise<SubscriberPuppet> puppet = new Promise<SubscriberPuppet>(env);
Receptacle<T> elements = new Receptacle<T>(env);
Latch completed = new Latch(env);
Expand All @@ -248,30 +248,30 @@ public void registerOnError(Throwable cause) {
error.complete(cause);
}

void expectNext(T expected) throws InterruptedException {
public void expectNext(T expected) throws InterruptedException {
expectNext(expected, env.defaultTimeoutMillis());
}

void expectNext(T expected, long timeoutMillis) throws InterruptedException {
public void expectNext(T expected, long timeoutMillis) throws InterruptedException {
T received = elements.next(timeoutMillis, String.format("Subscriber %s did not call `registerOnNext(%s)`", sub(), expected));
if (!received.equals(expected)) {
env.flop(String.format("Subscriber %s called `registerOnNext(%s)` rather than `registerOnNext(%s)`", sub(), received, expected));
}
}

void expectCompletion() throws InterruptedException {
public void expectCompletion() throws InterruptedException {
expectCompletion(env.defaultTimeoutMillis());
}

void expectCompletion(long timeoutMillis) throws InterruptedException {
public void expectCompletion(long timeoutMillis) throws InterruptedException {
completed.expectClose(timeoutMillis, String.format("Subscriber %s did not call `registerOnComplete()`", sub()));
}

void expectError(Throwable expected) throws InterruptedException {
public void expectError(Throwable expected) throws InterruptedException {
expectError(expected, env.defaultTimeoutMillis());
}

void expectError(Throwable expected, long timeoutMillis) throws InterruptedException {
public void expectError(Throwable expected, long timeoutMillis) throws InterruptedException {
error.expectCompletion(timeoutMillis, String.format("Subscriber %s did not call `registerOnError(%s)`", sub(), expected));
if (error.value() != expected) {
env.flop(String.format("Subscriber %s called `registerOnError(%s)` rather than `registerOnError(%s)`", sub(), error.value(), expected));
Expand All @@ -284,7 +284,7 @@ public void verifyNoAsyncErrors() {
}
}

interface SubscriberProbe<T> {
public interface SubscriberProbe<T> {
/**
* Must be called by the test subscriber when it has received the `onSubscribe` event.
*/
Expand All @@ -306,7 +306,7 @@ interface SubscriberProbe<T> {
void registerOnError(Throwable cause);
}

interface SubscriberPuppet {
public interface SubscriberPuppet {
void triggerShutdown();

void triggerRequestMore(int elements);
Expand Down
Loading

0 comments on commit 8f38503

Please sign in to comment.