Skip to content

Commit

Permalink
refactor(Subject): update type declaration for Subject
Browse files Browse the repository at this point in the history
relates to #608
  • Loading branch information
kwonoj authored and benlesh committed Nov 3, 2015
1 parent e96d810 commit 61e173b
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 55 deletions.
53 changes: 23 additions & 30 deletions src/Subject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export default class Subject<T> extends Observable<T> implements Observer<T>, Su
return new BidirectionalSubject(source, destination);
}

destination: Observer<T>;
protected destination: Observer<T>;

observers: Observer<T>[] = [];
isUnsubscribed: boolean = false;
Expand All @@ -41,8 +41,7 @@ export default class Subject<T> extends Observable<T> implements Observer<T>, Su
return subject;
}

_subscribe(subscriber: Subscriber<any>) : Subscription<T> {

_subscribe(subscriber: Subscriber<any>): Subscription<T> {
if (subscriber.isUnsubscribed) {
return;
} else if (this.errorSignal) {
Expand Down Expand Up @@ -73,8 +72,7 @@ export default class Subject<T> extends Observable<T> implements Observer<T>, Su
subscriptionUnsubscribe.call(this);
}

next(value) {

next(value: T): void {
if (this.isUnsubscribed) {
return;
}
Expand All @@ -90,25 +88,23 @@ export default class Subject<T> extends Observable<T> implements Observer<T>, Su
}
}

error(error) {

error(err?: any): void {
if (this.isUnsubscribed || this.completeSignal) {
return;
}

this.errorSignal = true;
this.errorInstance = error;
this.errorInstance = err;

if (this.dispatching) {
return;
}

this._error(error);
this._error(err);
this.unsubscribe();
}

complete() {

complete(): void {
if (this.isUnsubscribed || this.errorSignal) {
return;
}
Expand All @@ -124,7 +120,7 @@ export default class Subject<T> extends Observable<T> implements Observer<T>, Su
}


_next(value?) {
_next(value: T): void {
let index = -1;
const observers = this.observers.slice(0);
const len = observers.length;
Expand All @@ -134,7 +130,7 @@ export default class Subject<T> extends Observable<T> implements Observer<T>, Su
}
}

_error(error?) {
_error(err: any): void{
let index = -1;
const observers = this.observers;
const len = observers.length;
Expand All @@ -144,13 +140,13 @@ export default class Subject<T> extends Observable<T> implements Observer<T>, Su
this.isUnsubscribed = true;

while(++index < len) {
observers[index].error(error);
observers[index].error(err);
}

this.isUnsubscribed = false;
}

_complete() {
_complete(): void{
let index = -1;
const observers = this.observers;
const len = observers.length;
Expand All @@ -168,39 +164,36 @@ export default class Subject<T> extends Observable<T> implements Observer<T>, Su
}

class BidirectionalSubject<T> extends Subject<T> {

constructor(source: Observable<any>, destination: Observer<any>) {
constructor(public source: Observable<any>, protected destination: Observer<any>) {
super();
this.source = source;
this.destination = destination;
}

_subscribe(subscriber: Subscriber<T>) {
_subscribe(subscriber: Subscriber<T>): Subscription<T> {
const operator = this.operator;
return this.source._subscribe.call(this.source, operator ? operator.call(subscriber) : subscriber);
}

next(x) {
subscriberNext.call(this, x);
next(value?: T): void {
subscriberNext.call(this, value);
}

error(e) {
subscriberError.call(this, e);
error(err?: any): void {
subscriberError.call(this, err);
}

complete() {
complete(): void {
subscriberComplete.call(this);
}

_next(x) {
_subscriberNext.call(this, x);
_next(value: T): void {
_subscriberNext.call(this, value);
}

_error(e) {
_subscriberError.call(this, e);
_error(err: any): void {
_subscriberError.call(this, err);
}

_complete() {
_complete(): void {
_subscriberComplete.call(this);
}
}
4 changes: 2 additions & 2 deletions src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ export default class Subscriber<T> extends Subscription<T> implements Observer<T
}
}

error(error?: any): void {
error(err?: any): void {
if (!this.isUnsubscribed) {
this._error(error);
this._error(err);
this.unsubscribe();
}
}
Expand Down
19 changes: 10 additions & 9 deletions src/operators/groupBy.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Subscriber from '../Subscriber';
import Subscription from '../Subscription';
import Observable from '../Observable';
import Subject from '../Subject';
import Map from '../util/Map';
Expand All @@ -22,7 +23,7 @@ export class GroupByObservable<T, R> extends Observable<GroupedObservable<R>> {
super();
}

_subscribe(subscriber) {
_subscribe(subscriber: Subscriber<any>): Subscription<T> | Function | void {
const refCountSubscription = new RefCountSubscription();
const groupBySubscriber = new GroupBySubscriber(
subscriber, refCountSubscription, this.keySelector, this.elementSelector, this.durationSelector
Expand All @@ -45,7 +46,7 @@ class GroupBySubscriber<T, R> extends Subscriber<T> {
this.add(destination);
}

_next(x: T) {
_next(x: T): void {
let key = tryCatch(this.keySelector)(x);
if (key === errorObject) {
this.error(key.e);
Expand All @@ -58,7 +59,7 @@ class GroupBySubscriber<T, R> extends Subscriber<T> {
groups = this.groups = typeof key === 'string' ? new FastMap() : new Map();
}

let group: Subject<R> = groups.get(key);
let group: Subject<T|R> = groups.get(key);

if (!group) {
groups.set(key, group = new Subject());
Expand Down Expand Up @@ -89,7 +90,7 @@ class GroupBySubscriber<T, R> extends Subscriber<T> {
}
}

_error(err: any) {
_error(err: any): void {
const groups = this.groups;
if (groups) {
groups.forEach((group, key) => {
Expand All @@ -100,7 +101,7 @@ class GroupBySubscriber<T, R> extends Subscriber<T> {
this.destination.error(err);
}

_complete() {
_complete(): void {
const groups = this.groups;
if (groups) {
groups.forEach((group, key) => {
Expand All @@ -111,7 +112,7 @@ class GroupBySubscriber<T, R> extends Subscriber<T> {
this.destination.complete();
}

removeGroup(key: string) {
removeGroup(key: string): void {
this.groups.delete(key);
}
}
Expand All @@ -123,17 +124,17 @@ class GroupDurationSubscriber<T> extends Subscriber<T> {
super(null);
}

_next(value: T) {
_next(value: T): void {
this.group.complete();
this.parent.removeGroup(this.key);
}

_error(err: any) {
_error(err: any): void {
this.group.error(err);
this.parent.removeGroup(this.key);
}

_complete() {
_complete(): void {
this.group.complete();
this.parent.removeGroup(this.key);
}
Expand Down
4 changes: 2 additions & 2 deletions src/subjects/BehaviorSubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Subscriber from '../Subscriber';
import Subscription from '../Subscription';

export default class BehaviorSubject<T> extends Subject<T> {
constructor(public value: any) {
constructor(private value: any) {

This comment has been minimized.

Copy link
@benlesh

benlesh Nov 23, 2015

Member

Oops. this one got by me. This should remain public.

This comment has been minimized.

Copy link
@kwonoj

kwonoj Nov 23, 2015

Author Member

Oops 😓

super();
}

Expand All @@ -17,7 +17,7 @@ export default class BehaviorSubject<T> extends Subject<T> {
return subscription;
}

_next(value?) {
_next(value: T): void {
super._next(this.value = value);
}
}
17 changes: 5 additions & 12 deletions src/subjects/ReplaySubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export default class ReplaySubject<T> extends Subject<T> {
private bufferSize: number;
private _windowTime: number;
private scheduler: Scheduler;
private events: ReplayEvent<T> [] = [];
private events: ReplayEvent<T>[] = [];

constructor(bufferSize: number = Number.POSITIVE_INFINITY,
_windowTime: number = Number.POSITIVE_INFINITY,
Expand All @@ -19,7 +19,7 @@ export default class ReplaySubject<T> extends Subject<T> {
this.scheduler = scheduler;
}

_next(value?) {
_next(value: T): void {
const now = this._getNow();
this.events.push(new ReplayEvent(now, value));
super._next(value);
Expand All @@ -35,12 +35,11 @@ export default class ReplaySubject<T> extends Subject<T> {
return super._subscribe(subscriber);
}

private _getNow() {
private _getNow(): number {
return (this.scheduler || immediate).now();
}

private _getEvents(now) {

private _getEvents(now): ReplayEvent<T>[] {
const bufferSize = this.bufferSize;
const _windowTime = this._windowTime;
const events = this.events;
Expand Down Expand Up @@ -71,13 +70,7 @@ export default class ReplaySubject<T> extends Subject<T> {
}

class ReplayEvent<T> {

time: number;
value: T;

constructor(time: number, value: T) {
this.time = time;
this.value = value;
constructor(public time: number, public value: T) {
}
}

0 comments on commit 61e173b

Please sign in to comment.