Skip to content

Commit

Permalink
fix(Subject): do not expose static create method to inherited
Browse files Browse the repository at this point in the history
  • Loading branch information
OJ Kwon committed Aug 22, 2016
1 parent da8c1c2 commit 3107506
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 33 deletions.
6 changes: 6 additions & 0 deletions spec/subjects/AsyncSubject-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
import {SubjectBase} from '../../dist/cjs/Subject';

const AsyncSubject = Rx.AsyncSubject;

Expand All @@ -21,6 +22,11 @@ class TestObserver implements Rx.Observer<number> {

/** @test {AsyncSubject} */
describe('AsyncSubject', () => {
it('should extend SubjectBase', () => {
const subject = new AsyncSubject();
expect(subject).to.instanceOf(SubjectBase);
});

it('should emit the last value when complete', () => {
const subject = new AsyncSubject();
const observer = new TestObserver();
Expand Down
6 changes: 3 additions & 3 deletions spec/subjects/BehaviorSubject-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
import {SubjectBase} from '../../dist/cjs/Subject';
declare const {hot, expectObservable};

const BehaviorSubject = Rx.BehaviorSubject;
Expand All @@ -8,10 +9,9 @@ const ObjectUnsubscribedError = Rx.ObjectUnsubscribedError;

/** @test {BehaviorSubject} */
describe('BehaviorSubject', () => {
it('should extend Subject', (done: MochaDone) => {
it('should extend SubjectBase', () => {
const subject = new BehaviorSubject(null);
expect(subject instanceof Rx.Subject).to.be.true;
done();
expect(subject).to.instanceOf(SubjectBase);
});

it('should throw if it has received an error and getValue() is called', () => {
Expand Down
6 changes: 3 additions & 3 deletions spec/subjects/ReplaySubject-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
import {TestScheduler} from '../../dist/cjs/testing/TestScheduler';
import {SubjectBase} from '../../dist/cjs/Subject';
declare const {hot, expectObservable};

declare const rxTestScheduler: TestScheduler;
Expand All @@ -10,10 +11,9 @@ const Observable = Rx.Observable;

/** @test {ReplaySubject} */
describe('ReplaySubject', () => {
it('should extend Subject', (done: MochaDone) => {
it('should extend SubjectBase', () => {
const subject = new ReplaySubject();
expect(subject instanceof Rx.Subject).to.be.true;
done();
expect(subject).to.instanceOf(SubjectBase);
});

it('should replay values upon subscription', (done: MochaDone) => {
Expand Down
4 changes: 2 additions & 2 deletions src/AsyncSubject.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import {Subject} from './Subject';
import {SubjectBase} from './Subject';
import {Subscriber} from './Subscriber';
import {Subscription} from './Subscription';

/**
* @class AsyncSubject<T>
*/
export class AsyncSubject<T> extends Subject<T> {
export class AsyncSubject<T> extends SubjectBase<T> {
private value: T = null;
private hasNext: boolean = false;
private hasCompleted: boolean = false;
Expand Down
4 changes: 2 additions & 2 deletions src/BehaviorSubject.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import {Subject} from './Subject';
import {SubjectBase} from './Subject';
import {Subscriber} from './Subscriber';
import {Subscription, ISubscription} from './Subscription';
import {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError';

/**
* @class BehaviorSubject<T>
*/
export class BehaviorSubject<T> extends Subject<T> {
export class BehaviorSubject<T> extends SubjectBase<T> {

constructor(private _value: T) {
super();
Expand Down
4 changes: 2 additions & 2 deletions src/ReplaySubject.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Subject} from './Subject';
import {SubjectBase} from './Subject';
import {Scheduler} from './Scheduler';
import {queue} from './scheduler/queue';
import {Subscriber} from './Subscriber';
Expand All @@ -8,7 +8,7 @@ import {ObserveOnSubscriber} from './operator/observeOn';
/**
* @class ReplaySubject<T>
*/
export class ReplaySubject<T> extends Subject<T> {
export class ReplaySubject<T> extends SubjectBase<T> {
private _events: ReplayEvent<T>[] = [];
private _bufferSize: number;
private _windowTime: number;
Expand Down
39 changes: 18 additions & 21 deletions src/Subject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,33 @@ import {$$rxSubscriber} from './symbol/rxSubscriber';
* @class SubjectSubscriber<T>
*/
export class SubjectSubscriber<T> extends Subscriber<T> {
constructor(protected destination: Subject<T>) {
constructor(protected destination: SubjectBase<T>) {
super(destination);
}
}

/**
* @class Subject<T>
*/
export class Subject<T> extends Observable<T> implements ISubscription {

export abstract class SubjectBase<T> extends Observable<T> implements ISubscription {
[$$rxSubscriber]() {
return new SubjectSubscriber(this);
}

observers: Observer<T>[] = [];

closed = false;

isStopped = false;

hasError = false;

thrownError: any = null;

constructor() {
super();
}

static create: Function = <T>(destination: Observer<T>, source: Observable<T>): AnonymousSubject<T> => {
return new AnonymousSubject<T>(destination, source);
};

lift<T, R>(operator: Operator<T, R>): Observable<T> {
const subject = new AnonymousSubject(this, this);
subject.operator = operator;
return <any>subject;
}

next(value?: T) {
next(value?: T): void {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
Expand All @@ -63,7 +51,7 @@ export class Subject<T> extends Observable<T> implements ISubscription {
}
}

error(err: any) {
error(err: any): void {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
Expand All @@ -79,7 +67,7 @@ export class Subject<T> extends Observable<T> implements ISubscription {
this.observers.length = 0;
}

complete() {
complete(): void {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
Expand Down Expand Up @@ -121,30 +109,39 @@ export class Subject<T> extends Observable<T> implements ISubscription {
}
}

/**
* @class Subject<T>
*/
export class Subject<T> extends SubjectBase<T> {
static create: Function = <T>(destination: Observer<T>, source: Observable<T>): AnonymousSubject<T> => {
return new AnonymousSubject<T>(destination, source);
};
}

/**
* @class AnonymousSubject<T>
*/
export class AnonymousSubject<T> extends Subject<T> {
export class AnonymousSubject<T> extends SubjectBase<T> {
constructor(protected destination?: Observer<T>, source?: Observable<T>) {
super();
this.source = source;
}

next(value: T) {
next(value: T): void {
const { destination } = this;
if (destination && destination.next) {
destination.next(value);
}
}

error(err: any) {
error(err: any): void {
const { destination } = this;
if (destination && destination.error) {
this.destination.error(err);
}
}

complete() {
complete(): void {
const { destination } = this;
if (destination && destination.complete) {
this.destination.complete();
Expand Down

0 comments on commit 3107506

Please sign in to comment.