Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BIG: Subject rewrite #1701

Merged
merged 11 commits into from
May 22, 2016
169 changes: 35 additions & 134 deletions spec/Subject-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ describe('Subject', () => {
subject.complete();
});

it('should have the rxSubscriber Symbol', () => {
const subject = new Subject();
expect(subject[Rx.Symbol.rxSubscriber]).to.be.a('function');
});

it('should pump values to multiple subscribers', (done: MochaDone) => {
const subject = new Subject();
const expected = ['foo', 'bar'];
Expand Down Expand Up @@ -276,82 +271,7 @@ describe('Subject', () => {
expect(results3).to.deep.equal([]);
});

it('should allow ad-hoc subscription to be added to itself', () => {
const subject = new Subject();
const results1 = [];
const results2 = [];

const auxSubject = new Subject();

const subscription1 = subject.subscribe(
function (x) { results1.push(x); },
function (e) { results1.push('E'); },
() => { results1.push('C'); }
);
const subscription2 = auxSubject.subscribe(
function (x) { results2.push(x); },
function (e) { results2.push('E'); },
() => { results2.push('C'); }
);

subject.add(subscription2);

subject.next(1);
subject.next(2);
subject.next(3);
auxSubject.next('a');
auxSubject.next('b');

subscription1.unsubscribe();
subject.unsubscribe();

auxSubject.next('c');
auxSubject.next('d');

expect(results1).to.deep.equal([1, 2, 3]);
expect(subscription2.isUnsubscribed).to.be.true;
expect(results2).to.deep.equal(['a', 'b']);
});

it('should allow ad-hoc subscription to be removed from itself', () => {
const subject = new Subject();
const results1 = [];
const results2 = [];

const auxSubject = new Subject();

const subscription1 = subject.subscribe(
function (x) { results1.push(x); },
function (e) { results1.push('E'); },
() => { results1.push('C'); }
);
const subscription2 = auxSubject.subscribe(
function (x) { results2.push(x); },
function (e) { results2.push('E'); },
() => { results2.push('C'); }
);

subject.add(subscription2);

subject.next(1);
subject.next(2);
subject.next(3);
auxSubject.next('a');
auxSubject.next('b');

subject.remove(subscription2);
subscription1.unsubscribe();
subject.unsubscribe();

auxSubject.next('c');
auxSubject.next('d');

expect(results1).to.deep.equal([1, 2, 3]);
expect(subscription2.isUnsubscribed).to.be.false;
expect(results2).to.deep.equal(['a', 'b', 'c', 'd']);
});

it('should not allow values to be nexted after a return', (done: MochaDone) => {
it('should not allow values to be nexted after it is unsubscribed', (done: MochaDone) => {
const subject = new Subject();
const expected = ['foo'];

Expand All @@ -360,7 +280,7 @@ describe('Subject', () => {
});

subject.next('foo');
subject.complete();
subject.unsubscribe();
expect(() => subject.next('bar')).to.throw(Rx.ObjectUnsubscribedError);
done();
});
Expand Down Expand Up @@ -528,38 +448,24 @@ describe('Subject', () => {
}).to.throw(Rx.ObjectUnsubscribedError);
});

it('should throw ObjectUnsubscribedError when emit after completed', () => {
it('should not next after completed', () => {
const subject = new Rx.Subject();
const results = [];
subject.subscribe(x => results.push(x), null, () => results.push('C'));
subject.next('a');
subject.complete();

expect(() => {
subject.next('a');
}).to.throw(Rx.ObjectUnsubscribedError);

expect(() => {
subject.error('a');
}).to.throw(Rx.ObjectUnsubscribedError);

expect(() => {
subject.complete();
}).to.throw(Rx.ObjectUnsubscribedError);
subject.next('b');
expect(results).to.deep.equal(['a', 'C']);
});

it('should throw ObjectUnsubscribedError when emit after error', () => {
it('should not next after error', () => {
const subject = new Rx.Subject();
subject.error('e');

expect(() => {
subject.next('a');
}).to.throw(Rx.ObjectUnsubscribedError);

expect(() => {
subject.error('a');
}).to.throw(Rx.ObjectUnsubscribedError);

expect(() => {
subject.complete();
}).to.throw(Rx.ObjectUnsubscribedError);
const results = [];
subject.subscribe(x => results.push(x), (err) => results.push(err));
subject.next('a');
subject.error(new Error('wut?'));
subject.next('b');
expect(results).to.deep.equal(['a', new Error('wut?')]);
});

describe('asObservable', () => {
Expand Down Expand Up @@ -600,43 +506,38 @@ describe('Subject', () => {
expectObservable(observable).toBe(expected);
});

it('should work with inherited subject', (done: MochaDone) => {
it('should work with inherited subject', () => {
const results = [];
const subject = new Rx.AsyncSubject();

subject.next(42);
subject.complete();

const observable = subject.asObservable();

const expected = [new Rx.Notification('N', 42),
new Rx.Notification('C')];
observable.subscribe(x => results.push(x), null, () => results.push('done'));

observable.materialize().subscribe((x: Rx.Notification<number>) => {
expect(x).to.deep.equal(expected.shift());
}, (err: any) => {
done(err);
}, () => {
expect(expected).to.deep.equal([]);
done();
});
expect(results).to.deep.equal([42, 'done']);
});
});
});

it('should not eager', () => {
let subscribed = false;
describe('AnonymousSubject', () => {
it('should not eager', () => {
let subscribed = false;

const subject = new Rx.Subject(null, new Rx.Observable((observer: Rx.Observer<any>) => {
subscribed = true;
const subscription = Rx.Observable.of('x').subscribe(observer);
return () => {
subscription.unsubscribe();
};
}));
const subject = Rx.Subject.create(null, new Rx.Observable((observer: Rx.Observer<any>) => {
subscribed = true;
const subscription = Rx.Observable.of('x').subscribe(observer);
return () => {
subscription.unsubscribe();
};
}));

const observable = subject.asObservable();
expect(subscribed).to.be.false;
const observable = subject.asObservable();
expect(subscribed).to.be.false;

observable.subscribe();
expect(subscribed).to.be.true;
});
observable.subscribe();
expect(subscribed).to.be.true;
});
});
5 changes: 0 additions & 5 deletions spec/Subscriber-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@ const Subscriber = Rx.Subscriber;

/** @test {Subscriber} */
describe('Subscriber', () => {
it('should have the rxSubscriber symbol', () => {
const sub = new Subscriber();
expect(sub[Rx.Symbol.rxSubscriber]()).to.equal(sub);
});

describe('when created through create()', () => {
it('should not call error() if next() handler throws an error', () => {
const errorSpy = sinon.spy();
Expand Down
58 changes: 36 additions & 22 deletions spec/operators/cache-spec.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,45 @@
import * as Rx from '../../dist/cjs/Rx';
import {expect} from 'chai';
declare const {hot, cold, time, expectObservable};

declare const rxTestScheduler: Rx.TestScheduler;

/** @test {cache} */
describe('Observable.prototype.cache', () => {
it('should just work™', () => {
let subs = 0;
const source = Rx.Observable.create(observer => {
subs++;
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).cache();
let results = [];
source.subscribe(x => results.push(x));
expect(results).to.deep.equal([1, 2, 3]);
expect(subs).to.equal(1);
results = [];
source.subscribe(x => results.push(x));
expect(results).to.deep.equal([1, 2, 3]);
expect(subs).to.equal(1);
});

it('should replay values upon subscription', () => {
const s1 = hot('---^---a---b---c---| ').cache();
const expected1 = '----a---b---c---| ';
const expected2 = ' (abc|)';
const t = time( '----------------|');
const s1 = hot( '----a---b---c---| ').cache(undefined, undefined, rxTestScheduler);
const expected1 = '----a---b---c---| ';
const expected2 = ' (abc|)';
const sub2 = '------------------| ';

expectObservable(s1).toBe(expected1);

rxTestScheduler.schedule(() => {
expectObservable(s1).toBe(expected2);
}, t);
rxTestScheduler.schedule(() => expectObservable(s1).toBe(expected2), time(sub2));
});

it('should replay values and error', () => {
const s1 = hot('---^---a---b---c---# ').cache();
const s1 = hot('---^---a---b---c---# ').cache(undefined, undefined, rxTestScheduler);
const expected1 = '----a---b---c---# ';
const expected2 = ' (abc#)';
const t = time( '----------------|');
const expected2 = ' (abc#)';
const t = time( '------------------|');

expectObservable(s1).toBe(expected1);

Expand All @@ -32,7 +49,7 @@ describe('Observable.prototype.cache', () => {
});

it('should replay values and and share', () => {
const s1 = hot('---^---a---b---c------------d--e--f-|').cache();
const s1 = hot('---^---a---b---c------------d--e--f-|').cache(undefined, undefined, rxTestScheduler);
const expected1 = '----a---b---c------------d--e--f-|';
const expected2 = ' (abc)----d--e--f-|';
const t = time( '----------------|');
Expand All @@ -58,16 +75,13 @@ describe('Observable.prototype.cache', () => {
});

it('should have a bufferCount that limits the replay test 2', () => {
const s1 = hot('---^---a---b---c------------d--e--f-|').cache(2);
const s1 = hot( '----a---b---c------------d--e--f-|').cache(2);
const expected1 = '----a---b---c------------d--e--f-|';
const expected2 = ' (bc)-----d--e--f-|';
const t = time( '----------------|');

expectObservable(s1).toBe(expected1);

rxTestScheduler.schedule(() => {
expectObservable(s1).toBe(expected2);
}, t);
rxTestScheduler.schedule(() => expectObservable(s1).toBe(expected2), t);
});

it('should accept a windowTime that limits the replay', () => {
Expand All @@ -85,7 +99,7 @@ describe('Observable.prototype.cache', () => {
});

it('should handle empty', () => {
const s1 = cold('|').cache();
const s1 = cold('|').cache(undefined, undefined, rxTestScheduler);
const expected1 = '|';
const expected2 = ' |';
const t = time( '----------------|');
Expand All @@ -98,7 +112,7 @@ describe('Observable.prototype.cache', () => {
});

it('should handle throw', () => {
const s1 = cold('#').cache();
const s1 = cold('#').cache(undefined, undefined, rxTestScheduler);
const expected1 = '#';
const expected2 = ' #';
const t = time( '----------------|');
Expand All @@ -111,7 +125,7 @@ describe('Observable.prototype.cache', () => {
});

it('should handle never', () => {
const s1 = cold('-').cache();
const s1 = cold('-').cache(undefined, undefined, rxTestScheduler);
const expected1 = '-';
const expected2 = ' -';
const t = time( '----------------|');
Expand All @@ -124,7 +138,7 @@ describe('Observable.prototype.cache', () => {
});

it('should multicast a completion', () => {
const s1 = hot('--a--^--b------c-----d------e-|').cache();
const s1 = hot('--a--^--b------c-----d------e-|').cache(undefined, undefined, rxTestScheduler);
const t1 = time( '| ');
const e1 = '---b------c-----d------e-|';
const t2 = time( '----------| ');
Expand All @@ -142,7 +156,7 @@ describe('Observable.prototype.cache', () => {
});

it('should multicast an error', () => {
const s1 = hot('--a--^--b------c-----d------e-#').cache();
const s1 = hot('--a--^--b------c-----d------e-#').cache(undefined, undefined, rxTestScheduler);
const t1 = time( '| ');
const e1 = '---b------c-----d------e-#';
const t2 = time( '----------| ');
Expand Down
2 changes: 1 addition & 1 deletion spec/operators/do-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ describe('Observable.prototype.do', () => {
expect(value).to.equal(42);
});

it('should complete with a callback', () => {
it('should error with a callback', () => {
let err = null;
Observable.throw('bad').do(null, function (x) {
err = x;
Expand Down
Loading