Skip to content

Commit

Permalink
remove rule 1:12 (produce same elements to all Subscribers)
Browse files Browse the repository at this point in the history
This rule is in conflict with 1:11 which allows a Publisher to treat
multiple Subscribers as either as unicast or multicast recipients. The
verification of proper multicast behavior (which 1:12 specified) has
been retained, the test methods renamed accordingly.
  • Loading branch information
rkuhn committed Feb 17, 2015
1 parent b33420a commit 607da83
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 17 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public interface Publisher<T> {
| <a name="1.9">9</a> | `Publisher.subscribe` MUST call `onSubscribe` on the provided `Subscriber` prior to any other signals to that `Subscriber` and MUST return normally, except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject the `Subscriber`) is by calling `onError` (after calling `onSubscribe`). |
| <a name="1.10">10</a> | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. |
| <a name="1.11">11</a> | A `Publisher` MAY support multiple `Subscriber`s and decides whether each `Subscription` is unicast or multicast. |
| <a name="1.12">12</a> | A `Publisher` MUST produce the same elements, starting with the oldest element still available, in the same sequence for all its subscribers and MAY produce the stream elements at (temporarily) differing rates to different subscribers. |

[<a name="footnote-1-1">1</a>] : A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, shut-down or in a failed state.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,18 +297,18 @@ public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
}

@Override @Test
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
publisherVerification.required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne();
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne();
}

@Override @Test
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
publisherVerification.required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
}

@Override @Test
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
publisherVerification.required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected();
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected();
}

@Override @Test
Expand Down Expand Up @@ -755,4 +755,4 @@ public void expectError(Throwable cause, long timeoutMillis) throws InterruptedE
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ public void run(Publisher<T> pub) throws Throwable {

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12
@Override @Test
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
optionalActivePublisherTest(5, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
@Override
public void run(Publisher<T> pub) throws InterruptedException {
Expand Down Expand Up @@ -602,7 +602,7 @@ public void run(Publisher<T> pub) throws InterruptedException {

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12
@Override @Test
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
optionalActivePublisherTest(3, false, new PublisherTestRun<T>() { // This test is skipped if the publisher cannot produce enough elements
@Override
public void run(Publisher<T> pub) throws Throwable {
Expand Down Expand Up @@ -635,7 +635,7 @@ public void run(Publisher<T> pub) throws Throwable {

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12
@Override @Test
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
optionalActivePublisherTest(3, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
@Override
public void run(Publisher<T> pub) throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ public interface PublisherVerificationRules {
void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable;
void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable;
void optional_spec111_maySupportMultiSubscribe() throws Throwable;
void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable;
void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable;
void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable;
void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable;
void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable;
void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable;
void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable;
void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable;
void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception;
Expand All @@ -42,4 +42,4 @@ public interface PublisherVerificationRules {
void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable;
void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable;
void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public void optional_spec111_maySupportMultiSubscribe_shouldFailBy_actuallyPass(
}

@Test
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront_shouldFailBy_expectingOnError() throws Throwable {
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront_shouldFailBy_expectingOnError() throws Throwable {
requireTestFailure(new ThrowingRunnable() {
@Override public void run() throws Throwable {
customPublisherVerification(new Publisher<Integer>() {
Expand All @@ -370,7 +370,7 @@ public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfI
}
});
}
}).required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
}).optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
}
}, "Expected elements to be signaled in the same sequence to 1st and 2nd subscribers: Lists differ at element ");
}
Expand Down

0 comments on commit 607da83

Please sign in to comment.