Skip to content

Commit

Permalink
refactor(groupBy): reapply tryCatch perf improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Jan 27, 2016
1 parent 815cfae commit 39ebae9
Showing 1 changed file with 51 additions and 36 deletions.
87 changes: 51 additions & 36 deletions src/operator/groupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import {Operator} from '../Operator';
import {Subject} from '../Subject';
import {Map} from '../util/Map';
import {FastMap} from '../util/FastMap';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

/**
* Groups the items emitted by an Observable according to a specified criterion,
Expand Down Expand Up @@ -62,48 +60,65 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscr
this.add(destination);
}

protected _next(x: T): void {
let key = tryCatch(this.keySelector)(x);
if (key === errorObject) {
this.error(errorObject.e);
} else {
let groups = this.groups;
const elementSelector = this.elementSelector;
const durationSelector = this.durationSelector;
protected _next(value: T): void {
let key: K;
try {
key = this.keySelector(value);
} catch (err) {
this.error(err);
return;
}

if (!groups) {
groups = this.groups = typeof key === 'string' ? new FastMap() : new Map();
}
this._group(value, key);
}

let group = groups.get(key);
private _group(value: T, key: K) {
let groups = this.groups;

if (!group) {
groups.set(key, group = new Subject<R>());
let groupedObservable = new GroupedObservable(key, group, this);
if (!groups) {
groups = this.groups = typeof key === 'string' ? new FastMap() : new Map();
}

if (durationSelector) {
let duration = tryCatch(durationSelector)(new GroupedObservable<K, R>(key, <any>group));
if (duration === errorObject) {
this.error(errorObject.e);
} else {
this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this)));
}
}
let group = groups.get(key);

this.destination.next(groupedObservable);
}
if (!group) {
groups.set(key, group = new Subject<R>());
let groupedObservable = new GroupedObservable(key, group, this);

if (elementSelector) {
let value = tryCatch(elementSelector)(x);
if (value === errorObject) {
this.error(errorObject.e);
} else {
group.next(value);
}
} else {
group.next(x);
if (this.durationSelector) {
this._selectDuration(key, group);
}

this.destination.next(groupedObservable);
}

if (this.elementSelector) {
this._selectElement(value, group);
} else {
group.next(value);
}
}

private _selectElement(value: T, group: Subject<T | R>) {
let result: R;
try {
result = this.elementSelector(value);
} catch (err) {
this.error(err);
return;
}
group.next(result);
}

private _selectDuration(key: K, group: any) {
let duration: any;
try {
duration = this.durationSelector(new GroupedObservable<K, R>(key, group));
} catch (err) {
this.error(err);
return;
}
this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this)));
}

protected _error(err: any): void {
Expand Down

0 comments on commit 39ebae9

Please sign in to comment.