Skip to content

Commit

Permalink
Merge pull request #225 from reactive-streams/wip-remove-1.12
Browse files Browse the repository at this point in the history
remove rule 1:12 (produce same elements to all Subscribers)
  • Loading branch information
rkuhn committed Mar 12, 2015
2 parents b33420a + 29ccc42 commit 63074af
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 20 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public interface Publisher<T> {
| <a name="1.9">9</a> | `Publisher.subscribe` MUST call `onSubscribe` on the provided `Subscriber` prior to any other signals to that `Subscriber` and MUST return normally, except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject the `Subscriber`) is by calling `onError` (after calling `onSubscribe`). |
| <a name="1.10">10</a> | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. |
| <a name="1.11">11</a> | A `Publisher` MAY support multiple `Subscriber`s and decides whether each `Subscription` is unicast or multicast. |
| <a name="1.12">12</a> | A `Publisher` MUST produce the same elements, starting with the oldest element still available, in the same sequence for all its subscribers and MAY produce the stream elements at (temporarily) differing rates to different subscribers. |

[<a name="footnote-1-1">1</a>] : A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, shut-down or in a failed state.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,18 +297,18 @@ public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
}

@Override @Test
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
publisherVerification.required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne();
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne();
}

@Override @Test
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
publisherVerification.required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
}

@Override @Test
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
publisherVerification.required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected();
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected();
}

@Override @Test
Expand Down Expand Up @@ -755,4 +755,4 @@ public void expectError(Throwable cause, long timeoutMillis) throws InterruptedE
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -549,9 +549,9 @@ public void run(Publisher<T> pub) throws Throwable {
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.11
@Override @Test
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
optionalActivePublisherTest(5, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
@Override
public void run(Publisher<T> pub) throws InterruptedException {
Expand Down Expand Up @@ -600,9 +600,9 @@ public void run(Publisher<T> pub) throws InterruptedException {
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.11
@Override @Test
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable {
optionalActivePublisherTest(3, false, new PublisherTestRun<T>() { // This test is skipped if the publisher cannot produce enough elements
@Override
public void run(Publisher<T> pub) throws Throwable {
Expand Down Expand Up @@ -633,9 +633,9 @@ public void run(Publisher<T> pub) throws Throwable {
});
}

// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.12
// Verifies rule: https://github.com/reactive-streams/reactive-streams#1.11
@Override @Test
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable {
optionalActivePublisherTest(3, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)
@Override
public void run(Publisher<T> pub) throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ public interface PublisherVerificationRules {
void required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() throws Throwable;
void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable;
void optional_spec111_maySupportMultiSubscribe() throws Throwable;
void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable;
void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable;
void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable;
void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable;
void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable;
void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable;
void required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() throws Throwable;
void required_spec303_mustNotAllowUnboundedRecursion() throws Throwable;
void untested_spec304_requestShouldNotPerformHeavyComputations() throws Exception;
Expand All @@ -42,4 +42,4 @@ public interface PublisherVerificationRules {
void required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() throws Throwable;
void required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() throws Throwable;
void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() throws Throwable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public void optional_spec111_maySupportMultiSubscribe_shouldFailBy_actuallyPass(
}

@Test
public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront_shouldFailBy_expectingOnError() throws Throwable {
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront_shouldFailBy_expectingOnError() throws Throwable {
requireTestFailure(new ThrowingRunnable() {
@Override public void run() throws Throwable {
customPublisherVerification(new Publisher<Integer>() {
Expand All @@ -370,7 +370,7 @@ public void required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfI
}
});
}
}).required_spec112_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
}).optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront();
}
}, "Expected elements to be signaled in the same sequence to 1st and 2nd subscribers: Lists differ at element ");
}
Expand Down

0 comments on commit 63074af

Please sign in to comment.