Skip to content

Commit

Permalink
chore(merge): merge commit
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Aug 9, 2017
2 parents 9db141c + d2a32f9 commit bcde577
Show file tree
Hide file tree
Showing 13 changed files with 193 additions and 116 deletions.
47 changes: 46 additions & 1 deletion doc/operator-creation.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ For how to develop a custom operator for *this* library, [see below](#advanced).

## DIY Custom Operators for End Users


### Guidelines

In the most common case, users might like to create an operator to be used only by their app. These can be developed in
Expand All @@ -30,6 +29,7 @@ any way the developer sees fit, but here are some guidelines:
<!-- share-code-between-examples -->
### Example

<!-- skip-example -->
```js
function mySimpleOperator(someCallback) {
// We *could* do a `var self = this;` here to close over, but see next comment
Expand Down Expand Up @@ -63,12 +63,14 @@ There are a few ways to do this. It's really down to needs and preference:

1) Use the ES7 function bind operator (`::`) available in transpilers like [BabelJS](http://babeljs.io):

<!-- skip-example -->
```js
someObservable::mySimpleOperator(x => x + '!');
```

2) Create your own Observable subclass and override `lift` to return it:

<!-- skip-example -->
```js
class MyObservable extends Observable {
lift(operator) {
Expand All @@ -90,6 +92,7 @@ MyObservable.prototype.mySimpleOperator = mySimpleOperator;

3) Patch `Observable.prototype` directly:

<!-- skip-example -->
```js
Observable.prototype.mySimpleOperator = mySimpleOperator;

Expand All @@ -98,9 +101,51 @@ Observable.prototype.mySimpleOperator = mySimpleOperator;
someObservable.mySimpleOperator(x => x + '!');
```

### Operator as a pure function

If you don't want to patch the Observable prototype, you can also write the operator as a pure function that takes the input Observable as argument, instead of relying on the `this` keyword.

Example implementation:

<!-- skip-example -->
```js
function mySimpleOperator(someCallback) {
// notice that we return a function here
return function mySimpleOperatorImplementation(source) {
return Observable.create(subscriber => {
var subscription = source.subscribe(value => {
try {
subscriber.next(someCallback(value));
} catch(err) {
subscriber.error(err);
}
},
err => subscriber.error(err),
() => subscriber.complete());

return subscription;
});
}
}
```

This can now be used with the `let()` method on the Observable:

<!-- skip-example -->
```js
const obs = someObservable.let(mySimpleOperator(x => x + '!'));
```

## Publish your operator as a separate library

We strongly recommend you to publish your custom operator as an npm package as a pure function to be used with `let`. See the sections above. RxJS core already has over 100 operators, and we should not add more operators unless they are absolutely essential and add functionality not possible with existing operators.

Publishing it as a separate library will guarantee your operator to be immediately usable by the community, and we can start growing an RxJS ecosystem as opposed to making the RxJS library thicker and heavier. There are cases, however, where the new operator should be added to the core library.

## <a id="advanced"></a>Creating An Operator For Inclusion In *This* Library

**Please publish your operator as a separate library before proposing it into RxJS.** See the section above.

__To create an operator for inclusion in this library, it's probably best to work from prior art__. Something
like the `filter` operator would be a good start. It's not expected that you'll be able to read
this section and suddenly be an expert operator contributor.
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@
"commitizen": "^2.8.6",
"coveralls": "^2.11.13",
"cz-conventional-changelog": "^1.2.0",
"danger": "^0.21.0",
"danger": "^1.1.0",
"doctoc": "^1.0.0",
"escape-string-regexp": "^1.0.5 ",
"esdoc": "^0.4.7",
Expand Down Expand Up @@ -196,7 +196,7 @@
"tslint": "^4.4.2",
"typescript": "~2.0.6",
"typings": "^2.0.0",
"validate-commit-msg": "^2.3.1",
"validate-commit-msg": "^2.14.0",
"watch": "^1.0.1",
"webpack": "^1.13.1",
"xmlhttprequest": "1.8.0"
Expand Down
124 changes: 124 additions & 0 deletions spec/exports-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import { expect } from 'chai';
import { bindCallback } from '../dist/cjs/observable/bindCallback';
import { bindNodeCallback } from '../dist/cjs/observable/bindNodeCallback';
import { combineLatest } from '../dist/cjs/observable/combineLatest';
import { concat } from '../dist/cjs/observable/concat';
import { defer } from '../dist/cjs/observable/defer';
import { empty } from '../dist/cjs/observable/empty';
import { forkJoin } from '../dist/cjs/observable/forkJoin';
import { from } from '../dist/cjs/observable/from';
import { fromEvent } from '../dist/cjs/observable/fromEvent';
import { fromEventPattern } from '../dist/cjs/observable/fromEventPattern';
import { fromPromise } from '../dist/cjs/observable/fromPromise';
import { _if } from '../dist/cjs/observable/if';
import { interval } from '../dist/cjs/observable/interval';
import { merge } from '../dist/cjs/observable/merge';
import { never } from '../dist/cjs/observable/never';
import { of } from '../dist/cjs/observable/of';
import { onErrorResumeNext } from '../dist/cjs/observable/onErrorResumeNext';
import { pairs } from '../dist/cjs/observable/pairs';
import { race } from '../dist/cjs/observable/race';
import { range } from '../dist/cjs/observable/range';
import { _throw } from '../dist/cjs/observable/throw';
import { timer } from '../dist/cjs/observable/timer';
import { using } from '../dist/cjs/observable/using';
import { zip } from '../dist/cjs/observable/zip';
import * as Rx from '../dist/cjs/Rx';

describe('exports', () => {
it('should have rxjs/observable/bindCallback', () => {
expect(bindCallback).to.equal(Rx.Observable.bindCallback);
});

it('should have rxjs/observable/bindNodeCallback', () => {
expect(bindNodeCallback).to.equal(Rx.Observable.bindNodeCallback);
});

it('should have rxjs/observable/combineLatest', () => {
expect(combineLatest).to.equal(Rx.Observable.combineLatest);
});

it('should have rxjs/observable/concat', () => {
expect(concat).to.equal(Rx.Observable.concat);
});

it('should have rxjs/observable/defer', () => {
expect(defer).to.equal(Rx.Observable.defer);
});

it('should have rxjs/observable/empty', () => {
expect(empty).to.equal(Rx.Observable.empty);
});

it('should have rxjs/observable/forkJoin', () => {
expect(forkJoin).to.equal(Rx.Observable.forkJoin);
});

it('should have rxjs/observable/from', () => {
expect(from).to.equal(Rx.Observable.from);
});

it('should have rxjs/observable/fromEvent', () => {
expect(fromEvent).to.equal(Rx.Observable.fromEvent);
});

it('should have rxjs/observable/fromEventPattern', () => {
expect(fromEventPattern).to.equal(Rx.Observable.fromEventPattern);
});

it('should have rxjs/observable/fromPromise', () => {
expect(fromPromise).to.equal(Rx.Observable.fromPromise);
});

it('should have rxjs/observable/if', () => {
expect(_if).to.equal(Rx.Observable.if);
});

it('should have rxjs/observable/interval', () => {
expect(interval).to.equal(Rx.Observable.interval);
});

it('should have rxjs/observable/merge', () => {
expect(merge).to.equal(Rx.Observable.merge);
});

it('should have rxjs/observable/never', () => {
expect(never).to.equal(Rx.Observable.never);
});

it('should have rxjs/observable/of', () => {
expect(of).to.equal(Rx.Observable.of);
});

it('should have rxjs/observable/onErrorResumeNext', () => {
expect(onErrorResumeNext).to.equal(Rx.Observable.onErrorResumeNext);
});

it('should have rxjs/observable/pairs', () => {
expect(pairs).to.equal(Rx.Observable.pairs);
});

it('should have rxjs/observable/race', () => {
expect(race).to.equal(Rx.Observable.race);
});

it('should have rxjs/observable/range', () => {
expect(range).to.equal(Rx.Observable.range);
});

it('should have rxjs/observable/throw', () => {
expect(_throw).to.equal(Rx.Observable.throw);
});

it('should have rxjs/observable/timer', () => {
expect(timer).to.equal(Rx.Observable.timer);
});

it('should have rxjs/observable/using', () => {
expect(using).to.equal(Rx.Observable.using);
});

it('should have rxjs/observable/zip', () => {
expect(zip).to.equal(Rx.Observable.zip);
});
});
6 changes: 3 additions & 3 deletions src/add/observable/generate.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Observable } from '../../Observable';
import { GenerateObservable } from '../../observable/GenerateObservable';
import { generate as staticGenerate } from '../../observable/generate';

Observable.generate = GenerateObservable.create;
Observable.generate = staticGenerate;

declare module '../../Observable' {
namespace Observable {
export let generate: typeof GenerateObservable.create;
export let generate: typeof staticGenerate;
}
}
6 changes: 3 additions & 3 deletions src/add/observable/onErrorResumeNext.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Observable } from '../../Observable';
import { onErrorResumeNextStatic } from '../../operator/onErrorResumeNext';
import { onErrorResumeNext as staticOnErrorResumeNext } from '../../observable/onErrorResumeNext';

Observable.onErrorResumeNext = onErrorResumeNextStatic;
Observable.onErrorResumeNext = staticOnErrorResumeNext;

declare module '../../Observable' {
namespace Observable {
export let onErrorResumeNext: typeof onErrorResumeNextStatic;
export let onErrorResumeNext: typeof staticOnErrorResumeNext;
}
}
6 changes: 3 additions & 3 deletions src/add/observable/race.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Observable } from '../../Observable';
import { race as raceStatic } from '../../observable/race';
import { race as staticRace } from '../../observable/race';

Observable.race = raceStatic;
Observable.race = staticRace;

declare module '../../Observable' {
namespace Observable {
export let race: typeof raceStatic;
export let race: typeof staticRace;
}
}
2 changes: 1 addition & 1 deletion src/observable/FromObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export class FromObservable<T> extends Observable<T> {
return new FromObservable<T>(ish, scheduler);
} else if (isArray(ish)) {
return new ArrayObservable<T>(ish, scheduler);
} else if (isPromise(ish)) {
} else if (isPromise<T>(ish)) {
return new PromiseObservable<T>(ish, scheduler);
} else if (typeof ish[Symbol_iterator] === 'function' || typeof ish === 'string') {
return new IteratorObservable<T>(ish, scheduler);
Expand Down
3 changes: 3 additions & 0 deletions src/observable/generate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { GenerateObservable } from './GenerateObservable';

export const generate = GenerateObservable.create;
3 changes: 3 additions & 0 deletions src/observable/onErrorResumeNext.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { onErrorResumeNextStatic } from '../operator/onErrorResumeNext';

export const onErrorResumeNext = onErrorResumeNextStatic;
101 changes: 2 additions & 99 deletions src/observable/race.ts
Original file line number Diff line number Diff line change
@@ -1,100 +1,3 @@
import { Observable } from '../Observable';
import { isArray } from '../util/isArray';
import { ArrayObservable } from '../observable/ArrayObservable';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Subscription, TeardownLogic } from '../Subscription';
import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { raceStatic } from '../operator/race';

/**
* Returns an Observable that mirrors the first source Observable to emit an item.
* @param {...Observables} ...observables sources used to race for which Observable emits first.
* @return {Observable} an Observable that mirrors the output of the first Observable to emit an item.
* @static true
* @name race
* @owner Observable
*/
export function race<T>(observables: Array<Observable<T>>): Observable<T>;
export function race<T>(observables: Array<Observable<any>>): Observable<T>;
export function race<T>(...observables: Array<Observable<T> | Array<Observable<T>>>): Observable<T>;
export function race<T>(...observables: Array<Observable<any> | Array<Observable<any>>>): Observable<T> {
// if the only argument is an array, it was most likely called with
// `race([obs1, obs2, ...])`
if (observables.length === 1) {
if (isArray(observables[0])) {
observables = <Array<Observable<any>>>observables[0];
} else {
return <Observable<any>>observables[0];
}
}

return new ArrayObservable<T>(<any>observables).lift(new RaceOperator<T>());
}

export class RaceOperator<T> implements Operator<T, T> {
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new RaceSubscriber(subscriber));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
export class RaceSubscriber<T> extends OuterSubscriber<T, T> {
private hasFirst: boolean = false;
private observables: Observable<any>[] = [];
private subscriptions: Subscription[] = [];

constructor(destination: Subscriber<T>) {
super(destination);
}

protected _next(observable: any): void {
this.observables.push(observable);
}

protected _complete() {
const observables = this.observables;
const len = observables.length;

if (len === 0) {
this.destination.complete();
} else {
for (let i = 0; i < len && !this.hasFirst; i++) {
let observable = observables[i];
let subscription = subscribeToResult(this, observable, observable, i);

if (this.subscriptions) {
this.subscriptions.push(subscription);
}
this.add(subscription);
}
this.observables = null;
}
}

notifyNext(outerValue: T, innerValue: T,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, T>): void {
if (!this.hasFirst) {
this.hasFirst = true;

for (let i = 0; i < this.subscriptions.length; i++) {
if (i !== outerIndex) {
let subscription = this.subscriptions[i];

subscription.unsubscribe();
this.remove(subscription);
}
}

this.subscriptions = null;
}

this.destination.next(innerValue);
}
}
export const race = raceStatic;
Loading

0 comments on commit bcde577

Please sign in to comment.