diff --git a/spec/operators/distinctUntilChanged-spec.js b/spec/operators/distinctUntilChanged-spec.js new file mode 100644 index 0000000000..bacfbd6666 --- /dev/null +++ b/spec/operators/distinctUntilChanged-spec.js @@ -0,0 +1,15 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.distinctUntilChanged()', function () { + it('should distinguish between values', function (done) { + var expected = [1, 2, 1]; + Observable + .of(1, 1, 1, 2, 2, 1) + .distinctUntilChanged() + .subscribe(function(x) { + expect(x).toBe(expected.shift()); + }, null, done); + }); +}); \ No newline at end of file diff --git a/spec/operators/distinctUntilKeyChanged-spec.js b/spec/operators/distinctUntilKeyChanged-spec.js new file mode 100644 index 0000000000..6999a5c2ef --- /dev/null +++ b/spec/operators/distinctUntilKeyChanged-spec.js @@ -0,0 +1,15 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.distinctUntilKeyChanged()', function () { + it('should distinguish between values', function (done) { + var expected = [{val: 1}, {val: 2}, {val: 1}]; + Observable + .of({val: 1}, {val: 1}, {val: 1}, {val: 2}, {val: 2}, {val: 1}) + .distinctUntilKeyChanged("val") + .subscribe(function(x) { + expect(x).toDeepEqual(expected.shift()); + }, null, done); + }); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index a93c684774..56695898fa 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -129,6 +129,8 @@ export default class Observable { startWith: (x: T) => Observable; filter: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable; + distinctUntilChanged: (compare?: (x: T, y: T) => boolean, thisArg?: any) => Observable; + distinctUntilKeyChanged: (key: string, compare?: (x: any, y: any) => boolean, thisArg?: any) => Observable; skip: (count: number) => Observable; take: (count: number) => Observable; takeUntil: (observable: Observable) => Observable; diff --git a/src/Rx.ts b/src/Rx.ts index cc1b061ef6..18b4e55a8c 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -87,15 +87,19 @@ observableProto.scan = scan; observableProto.reduce = reduce; observableProto.startWith = startWith; -import filter from './operators/filter'; import take from './operators/take'; import skip from './operators/skip'; import takeUntil from './operators/takeUntil'; +import filter from './operators/filter'; +import distinctUntilChanged from './operators/distinctUntilChanged'; +import distinctUntilKeyChanged from './operators/distinctUntilKeyChanged'; observableProto.take = take; observableProto.skip = skip; observableProto.takeUntil = takeUntil; observableProto.filter = filter; +observableProto.distinctUntilChanged = distinctUntilChanged; +observableProto.distinctUntilKeyChanged = distinctUntilKeyChanged; import combineLatest from './operators/combineLatest'; import combineAll from './operators/combineAll'; diff --git a/src/operators/distinctUntilChanged.ts b/src/operators/distinctUntilChanged.ts new file mode 100644 index 0000000000..949d53ee19 --- /dev/null +++ b/src/operators/distinctUntilChanged.ts @@ -0,0 +1,64 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import bindCallback from '../util/bindCallback'; + +export default function distinctUntilChanged(compare?: (x: T, y: T) => boolean, thisArg?: any) { + return this.lift(new DistinctUntilChangedOperator(thisArg ? + <(x: T, y: T) => boolean> bindCallback(compare, thisArg, 2) : + compare)); +} + +export class DistinctUntilChangedOperator extends Operator { + + compare: (x: T, y: T) => boolean; + + constructor(compare?: (x: T, y: T) => boolean) { + super(); + this.compare = compare; + } + + call(observer: Observer): Observer { + return new DistinctUntilChangedSubscriber(observer, this.compare); + } +} + +export class DistinctUntilChangedSubscriber extends Subscriber { + + value: T; + hasValue: boolean = false; + + constructor(destination: Observer, compare?: (x: T, y: T) => boolean) { + super(destination); + if (typeof compare === "function") { + this.compare = compare; + } + } + + compare(x: T, y: T) { + return x === y; + } + + _next(x) { + + let result: any = false; + + if(this.hasValue) { + result = tryCatch(this.compare)(this.value, x); + if (result === errorObject) { + this.destination.error(errorObject.e); + return; + } + } else { + this.hasValue = true; + } + + if (Boolean(result) === false) { + this.value = x; + this.destination.next(x); + } + } +} diff --git a/src/operators/distinctUntilKeyChanged.ts b/src/operators/distinctUntilKeyChanged.ts new file mode 100644 index 0000000000..94592b333d --- /dev/null +++ b/src/operators/distinctUntilKeyChanged.ts @@ -0,0 +1,10 @@ +import distinctUntilChanged from './distinctUntilChanged'; + +export default function distinctUntilKeyChanged(key: string, compare?: (x: any, y: any) => boolean, thisArg?: any) { + return distinctUntilChanged.call(this, function(x, y) { + if (compare) { + return compare.call(thisArg, x[key], y[key]); + } + return x[key] === y[key]; + }); +}