Skip to content

Commit

Permalink
remove requirement to track demand beyond Long.MAX_VALUE
Browse files Browse the repository at this point in the history
fixes #196

Also terminate spec rule sentences with a full stop where missing.
  • Loading branch information
rkuhn committed Jan 23, 2015
1 parent 13ddd32 commit a3005b7
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 83 deletions.
104 changes: 52 additions & 52 deletions README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ final class SubscriptionImpl implements Subscription, Runnable {
private void doRequest(final long n) {
if (n < 1)
terminateDueTo(new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));
else if (demand + n < 1)
terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 3.17 by demanding more elements than Long.MAX_VALUE."));
else {
else if (demand + n < 1) {
// As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as "effectively unbounded"
demand = Long.MAX_VALUE; // Here we protect from the overflow and treat it as "effectively unbounded"
doSend(); // Then we proceed with sending data downstream
} else {
demand += n; // Here we record the downstream demand
doSend(); // Then we can proceed with sending data downstream
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,8 @@ public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMa
}

@Override @Test
public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
publisherVerification.required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue();
public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
publisherVerification.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue();
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -940,25 +940,30 @@ public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMa
activePublisherTest(totalElements, true, new PublisherTestRun<T>() {
@Override
public void run(Publisher<T> pub) throws Throwable {
ManualSubscriber<T> sub = env.newManualSubscriber(pub);
final ManualSubscriber<T> sub = env.newManualSubscriber(pub);
sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE / 2
sub.request(Long.MAX_VALUE / 2); // pending = Long.MAX_VALUE - 1
sub.request(1); // pending = Long.MAX_VALUE

sub.nextElements(totalElements);
sub.expectCompletion();

env.verifyNoAsyncErrors();
try {
env.verifyNoAsyncErrors();
} finally {
sub.cancel();
}

}
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#3.17
@Override @Test
public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable {
activePublisherTest(Integer.MAX_VALUE, false, new PublisherTestRun<T>() {
@Override public void run(Publisher<T> pub) throws Throwable {
ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(env) {
final ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(env) {
// arbitrarily set limit on nuber of request calls signalled, we expect overflow after already 2 calls,
// so 10 is relatively high and safe even if arbitrarily chosen
int callsCounter = 10;
Expand All @@ -982,10 +987,12 @@ public void onNext(T element) {
// we're pretty sure to overflow from those
sub.request(1);

sub.expectErrorWithMessage(IllegalStateException.class, "3.17");

// onError must be signalled only once, even with in-flight other request() messages that would trigger overflow again
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
// no onError should be signalled
try {
env.verifyNoAsyncErrors(env.defaultTimeoutMillis());
} finally {
sub.cancel();
}
}
});
}
Expand Down Expand Up @@ -1105,4 +1112,4 @@ public void notVerified(String message) {
throw new SkipException(message);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ public interface PublisherVerificationRules {
void required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() throws Throwable;
void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable;
void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable;
void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable;
void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable;
}
Original file line number Diff line number Diff line change
Expand Up @@ -473,17 +473,7 @@ public void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue_sho
}

@Test
public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue_shouldFail_onAsynchDemandIgnoringPublisher() throws Throwable {
final ExecutorService signallersPool = Executors.newFixedThreadPool(2);
requireTestFailure(new ThrowingRunnable() {
@Override public void run() throws Throwable {
demandIgnoringAsynchronousPublisherVerification(signallersPool).required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue();
}
}, "Expected onError(java.lang.IllegalStateException)");
}

@Test
public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue_shouldFail_onSynchDemandIgnoringPublisher() throws Throwable {
public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue_shouldFail_onSynchOverflowingPublisher() throws Throwable {
requireTestFailure(new ThrowingRunnable() {
@Override public void run() throws Throwable {
customPublisherVerification(new Publisher<Integer>() {
Expand All @@ -494,20 +484,26 @@ public void required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue_shoul
@Override public void request(long n) {
// it does not protect from demand overflow!
demand += n;
if (demand < 0) {
// overflow
s.onError(new IllegalStateException("Illegally signalling onError (violates rule 3.17)")); // Illegally signal error
} else {
s.onNext(0);
}
}

@Override public void cancel() {
// noop
}
});
}
}).required_spec317_mustSignalOnErrorWhenPendingAboveLongMaxValue();
}).required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue();
}
}, "Expected onError(java.lang.IllegalStateException)");
}, "Async error during test execution: Illegally signalling onError (violates rule 3.17)");
}

@Test
public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue_shouldFail_overflowingDemand() throws Throwable {
public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue_shouldFailWhenErrorSignalledOnceMaxValueReached() throws Throwable {
requireTestFailure(new ThrowingRunnable() {
@Override public void run() throws Throwable {
customPublisherVerification(new Publisher<Integer>() {
Expand All @@ -520,15 +516,16 @@ public void required_spec317_mustSupportACumulativePendingElementCountUpToLongMa

// this is a mistake, it should still be able to accumulate such demand
if (demand == Long.MAX_VALUE)
s.onError(new IllegalStateException("I'm signalling onError too soon! Cumulative demand equal to Long.MAX_VALUE is OK by the spec."));
s.onError(new IllegalStateException("Illegally signalling onError too soon! " +
"Cumulative demand equal to Long.MAX_VALUE is legal."));

s.onNext(0);
}
});
}
}).required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue();
}
}, "Async error during test execution: I'm signalling onError too soon!");
}, "Async error during test execution: Illegally signalling onError too soon!");
}


Expand Down

0 comments on commit a3005b7

Please sign in to comment.