Skip to content

Commit

Permalink
feat(operator): add skipUntil
Browse files Browse the repository at this point in the history
closes #180
  • Loading branch information
benlesh committed Aug 19, 2015
1 parent 1e13aea commit ef2620e
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 0 deletions.
16 changes: 16 additions & 0 deletions spec/operators/skipUntil-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/* globals describe, it, expect, jasmine */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.skipUntil()', function () {
it('should skip values until another observable notifies', function (done) {
var expected = [5];

Observable.timer(0, 10)
.skipUntil(Observable.timer(45))
.take(1)
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, done);
});
});
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ export default class Observable<T> {
distinctUntilChanged: (compare?: (x: T, y: T) => boolean, thisArg?: any) => Observable<T>;
distinctUntilKeyChanged: (key: string, compare?: (x: any, y: any) => boolean, thisArg?: any) => Observable<T>;
skip: (count: number) => Observable<T>;
skipUntil: (notifier: Observable<any>) => Observable<T>;
take: (count: number) => Observable<T>;
takeUntil: (observable: Observable<any>) => Observable<T>;
partition: (predicate: (x: T) => boolean) => Observable<T>[];
Expand Down
2 changes: 2 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ observableProto.startWith = startWith;

import take from './operators/take';
import skip from './operators/skip';
import skipUntil from './operators/skipUntil';
import takeUntil from './operators/takeUntil';
import filter from './operators/filter';
import distinctUntilChanged from './operators/distinctUntilChanged';
Expand All @@ -101,6 +102,7 @@ import distinctUntilKeyChanged from './operators/distinctUntilKeyChanged';
observableProto.take = take;
observableProto.skip = skip;
observableProto.takeUntil = takeUntil;
observableProto.skipUntil = skipUntil;
observableProto.filter = filter;
observableProto.distinctUntilChanged = distinctUntilChanged;
observableProto.distinctUntilKeyChanged = distinctUntilKeyChanged;
Expand Down
46 changes: 46 additions & 0 deletions src/operators/skipUntil.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Observable from '../Observable';

export default function skipUntil(total) {
return this.lift(new SkipUntilOperator(total));
}

export class SkipUntilOperator<T, R> extends Operator<T, R> {
constructor(private notifier: Observable<any>) {
super();
}

call(observer: Observer<R>): Observer<T> {
return new SkipUntilSubscriber<T>(observer, this.notifier);
}
}

export class SkipUntilSubscriber<T> extends Subscriber<T> {
private notificationSubscriber: NotificationSubscriber<any> = new NotificationSubscriber();

constructor(destination: Observer<T>, private notifier: Observable<any>) {
super(destination);
this.add(this.notifier.subscribe(this.notificationSubscriber))
}

_next(x) {
if (this.notificationSubscriber.hasNotified) {
this.destination.next(x);
}
}
}

export class NotificationSubscriber<T> extends Subscriber<T> {
hasNotified: boolean = false;

constructor() {
super(null);
}

_next() {
this.hasNotified = true;
this.unsubscribe();
}
}

0 comments on commit ef2620e

Please sign in to comment.