Skip to content

Commit

Permalink
docs: Add more information about subscriptions inside Observable init…
Browse files Browse the repository at this point in the history
…ializers.
  • Loading branch information
benlesh committed Feb 4, 2021
1 parent 5d6e8c4 commit ef66a0c
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 93 deletions.
48 changes: 23 additions & 25 deletions docs_app/content/guide2/1-subscribing.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,25 @@

- Observables don't do anything until you call [subscribe](API)
- Calling [subscribe](API) will synchronously execute code that sets up the subscription, telling the underlying [producer](GL) to start pushing values to your code (the [consumer](GL)).
- [subscribe](API) may be called with _one function_ that is the [next](GL) handler, *OR* an [`Observer`](API) object, which has a [next](GL), [error](GL), and/or [complete](GL) handler on it.
- If no [error](GL) handler is provided, RxJS will assume all errors that are pushed are "unhandled" errors, and rethrow them on a different call stack.
- [subscribe](API) may be called with _one function_ that is the [next](GL) handler, _OR_ an [`Observer`](API) object, which has a [next](GL), [error](GL), and/or [complete](GL) handler on it.
- If no [error](GL) handler is provided, RxJS will assume all error notifications are "unhandled"; RxJS will rethrow them on a different call stack.

## Overview

Chances are, the first time you encounter an [Observable](API), it is because you have had one returned to you by some other API. In this case, the most helpful thing for you to know is how to get values from the observable, and the semantics of [subscription](GL)

> The first thing to know is, an observable doesn't "do" anything.
An observable, in fact, is _not_ a "stream". An observable is a template for [subscription](GL), through which your code (the [consumer](GL)), will tell a [producer](GL) to start pushing values to it by subscribing to an observable, creating a [subscription](GL).
An observable is actually _not_ a "stream"; Observables are templates for [subscriptions](GL), through which your code (the [consumer](GL)), will tell a [producer](GL) to start pushing values to it by subscribing to an observable, creating a [subscription](GL).

## Basic Subscription

```ts
const source$ = getSomeObservable();

source$.subscribe(value => {
// do something with each value as it is pushed to your code
console.log(value);
source$.subscribe((value) => {
// do something with each value as it is pushed to your code
console.log(value);
});
```

Expand All @@ -34,38 +34,36 @@ The most basic form of subscription is to subscribe to the observable with a sin
const source$ = getSomeObservable();

source$.subscribe({
next(value) {
// Do something with each value as it is pushed to your code
console.log(value);
},
error(err) {
// Whatever is producing and pushing values has encountered an error
// and will no longer send values
console.error(err);
},
complete() {
// The producer pushing values is done pushing values, and wishes to notify
// your code that you will no longer receive values.
console.log('done');
}
})
next(value) {
// Do something with each value as it is pushed to your code
console.log(value);
},
error(err) {
// Whatever is producing and pushing values has encountered an error
// and will no longer send values
console.error(err);
},
complete() {
// The producer pushing values is done pushing values, and wishes to notify
// your code that you will no longer receive values.
console.log('done');
},
});
```

An `Observer` is a type, basically any object with a `next`, `error`, and `complete` method on it. A "partial observer" is any object with at least one of those methods. It is a way to pass these three handlers to the subscription so that a [producer](GL) can notify your code (the [consumer](GL)). `next` is called when the producer pushes a value to the consumer. `error` is called when the producer has encountered an error and must stop sending values, and `complete` is called when the producer has pushed all values to the consumer, and will push no more.

An `Observer` is an interface, basically any object with a `next`, `error`, and `complete` method on it. A "partial observer" is any object with at least one of those methods. Observers are a means of passing three handlers to the subscription so that a [producer](GL) can notify your code (the [consumer](GL)). The `next` handler is called when the producer pushes a value to the consumer. The `error` handler is called when the producer has encountered an error and must stop sending values. The `complete` handler is called when the producer has pushed all values to the consumer and will push no more.

## Unhandled Errors

If you call subscribe with one function ([Basic Subscription](#Basic_Subscription)), or with a partial [observer](#Subscription_With_An_Observer) that does not have an [error](GL) handler, RxJS will treat all errors pushed by the [producer](GL) as "unhandled". This means that the developer did nothing to handle the error.

All unhandled errors will be rethrown in a different call context. This is done for a variety of reasons, but what is important to know is: Even if your observable is totally synchronous, an error from your observable _cannot be caught_ by wrapping the [`subscribe`](API) call in a `try-catch` block.


### Handled Errors

Simply by providing an [error](GL) callback to your [`subscribe`](API) call, via an [observer](#Subscription_With_An_Observer) (or by the deprecated error callback argument), RxJS will treat errors pushed by the [producer](GL) has "handled". This means whatever that error handler function does is all the handling that will be provided for that error.

---

<a rel="license" href="http://creativecommons.org/licenses/by/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://licensebuttons.net/l/by/4.0/80x15.png" /></a>
This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by/4.0/">Creative Commons Attribution 4.0 International License</a>
This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by/4.0/">Creative Commons Attribution 4.0 International License</a>
22 changes: 10 additions & 12 deletions docs_app/content/guide2/2-unsubscribing.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ On rare occasions, logic executed to [teardown](GL) a [subscription](GL) can thr
const subscription = source$.subscribe(console.log);

try {
subscription.unsubscribe();
subscription.unsubscribe();
} catch (unsubError) {
// Any error in here will be an UnsubscriptionError, unless you
// have other calls nested in your try { } block.
console.error(`Unsubscription encountered ${unsubError.errors.length} errors`);
for (const error of unsubError.errors) {
console.error(error);
}
// Any error in here will be an UnsubscriptionError, unless you
// have other calls nested in your try { } block.
console.error(`Unsubscription encountered ${unsubError.errors.length} errors`);
for (const error of unsubError.errors) {
console.error(error);
}
}
```

Expand All @@ -46,25 +46,23 @@ Well, no... but **WHEN IN DOUBT, UNSUBSCRIBE**. Unsubscription is important beca

- **Subscriptions that should stay active for the life of your server/web document**. If it needs to stay up for as long as the host environment is open, then there is no need to tear it down. **HOWEVER**. If you have a web application that can be mounted and unmounted, you will want to unsubscribe from all subscriptions owned by that web application. Otherwise, when unmounting the app it will leave it subscriptions active and hang onto resources.
- **Subscriptions that you know will complete that are delivering a value you always want to get**. For example, if you're loading some expensive to calculate data that you know you're going to use eventually, and you don't want to get it twice, you might choose to allow an observable to run to completion, even if the original consuming code no longer cares about the value.
- **Synchronous observables**. By the time you get the `Subscription` back from a synchronous observable, it is already `complete`, and it as already torndown. There is no need to keep the `Subscription` in memory or unsubscribe from it later (It won't hurt much if you do, but it's unnecessary). Examples of these are things like the result of [`of`](API), [`from`](API), [`range`](API), et al.
- **Synchronous observables**. By the time you get the `Subscription` back from a synchronous observable, it is already `complete`, and it has already torn down any underlying resources. There is no need to keep the `Subscription` in memory or unsubscribe from it later (It won't hurt much if you do, but it's unnecessary). Examples of these are things like the result of [`of`](API), [`from`](API), [`range`](API), et al.

### MUST unsubscribe:

- **Never-ending subscriptions**. If you do not unsubscribe from these, they will continue forever and consume memory and computing resources. Examples of this could be a web socket stream, or a simple interval. You don't need your app ticking along processing repetitive tasks it no longer cares about and consuming precious time on that single thread.
- **Long-running, expensive subscriptions**. Subscriptions whose actions or side effects are expensive or no longer necessary must be unsubscribed when no longer in use. This, again, is to free up processing capacity and memory. Large streaming results from HTTP or web sockets, even if you've engineered them to complete after some time, must be torn down when you no longer are interested in their results. This is done to prevent memory leaks and free up resources.
- **Subscriptions that register event handlers**. Subscriptions who register objects or functions (particularly functions with closures) with external event emitters and event targets must be torn down. Such things are a common cause of memory leaks, and functions registered and event handlers that close over other variables and objects (such as a component reference via `this`) can cause large things to be retained in memory indefinitely.

- **Subscriptions that register event handlers**. Subscriptions that register objects or functions (particularly functions with closures) with external event emitters and event targets must be torn down. Such things are a common cause of memory leaks, and functions registered and event handlers that close over other variables and objects (such as a component reference via `this`) can cause large things to be retained in memory indefinitely.

### SHOULD unsubscribe:

- **Single-value subscriptions you know will complete quickly**. Examples of this would be something like a quick HTTP GET or POST. It's not going to be the end of the world if you don't unsubscribe here. You're effectively opting for the same poor behavior you'd have gotten from a promise-based HTTP library in that case, as they do not have cancellation. However, this generally amounts to lazy programming, so it should still be avoided if possible.


## What About takeUntil and "Parent Subscriptions", etc?

We will cover that in [Advanced Subscription Management](LINK) later in the guide.

---

<a rel="license" href="http://creativecommons.org/licenses/by/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://licensebuttons.net/l/by/4.0/80x15.png" /></a>
This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by/4.0/">Creative Commons Attribution 4.0 International License</a>
This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by/4.0/">Creative Commons Attribution 4.0 International License</a>
147 changes: 91 additions & 56 deletions docs_app/content/guide2/3-creating-an-observable.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,45 +27,44 @@ A creation function, in RxJS terms, is simply a standalone function provided by
- [`timer`](API)
- [`defer`](API)
- [`bindCallback`](API)
- et al...
- et al...

It is highly recommended that you try to use these functions to create your observable. They are well tested, and used in a huge number of projects, and can generally be composed to create most observables you need.

## Observable Construction

Sometimes, however, RxJS does not offer exactly the creation function you're looking for. Perhaps you need to wrap some other API to create an observable, or there's some bit of nuance to the RxJS implementation you don't like. In that case, you can do what RxJS does with the creation functions under the hood, and use the `Observable` [constructor](API) directly.


### Common Case

In the most common case, you'll be wrapping types that asynchronously deliver data. Here's how the observable constructor is used, in a basic case with a contrived type called `SomeDataService`:

```ts
const source$ = new Observable((subscriber) => {
// Set up you producer (if you need to) here
const dataService = new SomeDataService();

// Tie your subscriber to your producer here
dataService.ondata = (data) => {
// Push N values to your consumer with `subscriber.next`
subscriber.next(data);
};

dataService.onerror = (err) => {
// if you have an error, notify the consumer with `subscriber.error`
subscriber.error(err);
};

dataService.onclose = () => {
// If the producer is done pushing values, notify the consumer
// with `subscriber.complete`
subscriber.complete();
};

return () => {
// Teardown logic goes here
dataService.destroy();
};
// Set up you producer (if you need to) here
const dataService = new SomeDataService();

// Tie your subscriber to your producer here
dataService.ondata = (data) => {
// Push N values to your consumer with `subscriber.next`
subscriber.next(data);
};

dataService.onerror = (err) => {
// if you have an error, notify the consumer with `subscriber.error`
subscriber.error(err);
};

dataService.onclose = () => {
// If the producer is done pushing values, notify the consumer
// with `subscriber.complete`
subscriber.complete();
};

return () => {
// Teardown logic goes here
dataService.destroy();
};
});
```

Expand All @@ -81,18 +80,19 @@ Let's take a look at the most basic "synchronous firehose":

```ts
// BAD: this is going to lock your thread on subscribe!
const firehose$ = new Observable(subscriber => {
let n = 0;
while (true) {
subscriber.notify(n++);
}
const firehose$ = new Observable((subscriber) => {
let n = 0;
while (true) {
subscriber.notify(n++);
}
});

firehose$.pipe(
firehose$
.pipe(
// We only wanted (and will only get) 10 values!
take(10)
)
.subscribe(console.log); // Doom here.
)
.subscribe(console.log); // Doom here.
```

The code above will indeed only log 10 values, but it will still lock your thread. So what gives? The answer may seem obvious: There's nothing to stop that `while` loop in the event that the `subscriber` can no longer push values to the [consumer](GL).
Expand All @@ -103,43 +103,78 @@ Synchronously, when a subscriber has called `complete` or `error`, or when a con

```ts
// BAD: this is going to lock your thread on subscribe!
const firehose$ = new Observable(subscriber => {
let n = 0;
// This check will prevent this loop from running forever
// as long as there is a `take`, etc.
while (!subscriber.closed) {
subscriber.notify(n++);
}
const firehose$ = new Observable((subscriber) => {
let n = 0;
// This check will prevent this loop from running forever
// as long as there is a `take`, etc.
while (!subscriber.closed) {
subscriber.notify(n++);
}
});

firehose$.pipe(
firehose$
.pipe(
// We only wanted (and will only get) 10 values!
take(10)
)
.subscribe(console.log); // Much better.
)
.subscribe(console.log); // Much better.
```

### Returning Subscription From Initialization
### Dealing With Inner Subscriptions

Quite often, when creating operators, or sometimes when creating functions that return observables, you'll find that you need to subscribe to other "inner" observables in the initialization function of the observable you're creating. These inner observables should be properly chained.

It's always okay to return a teardown function that calls `unsubscribe` on the inner subscription, but there are other useful methods of chaining subscriptions that RxJS provides.

For helpful ergonomics, if you return a [`Subscription`](API) object as [teardown](GL) from your initialization function, RxJS will handle that appropriately. There is no need to wrap an RxJS [`Subscription`](API) in a `() => void` function.
#### Returning Subscription From Initialization

This is, in fact, how [operators](GL) are created, which we will get to in a bit.
For helpful ergonomics, if you return a [`Subscription`](API) object as [teardown](GL) from your initialization function, RxJS will handle that appropriately. There is no need to wrap an RxJS [`Subscription`](API) in a `() => void` function. All functions and subscriptions returned from the initialization function are technically added to the `subscriber` as a subscription.

This is often how [operators](GL) are created, which we will get to in a bit.

```ts
const originalSource$ = interval(1000);

const mySource$ = new Observable(subscriber => {
const subscription = originalSource$.subscribe({
next: value => subscriber.next(value + value),
error: err => subscriber.error(err),
complete: () => subscriber.complete()
});
const mySource$ = new Observable((subscriber) => {
const subscription = originalSource$.subscribe({
next: (value) => subscriber.next(value + value),
error: (err) => subscriber.error(err),
complete: () => subscriber.complete(),
});

return subscription;
})l
return subscription;
});
```

#### Adding Inner Subscriptions To The Subscriber

Subscribers are both "safe" observers, providing guarantees mentioned above, as well as Subscriptions themselves. This means that subscribers have `add`, `remove`, and `unsubscribe` methods. We discuss this in a little more detail in [Advanced Subscription Management](LINK) later on. But, in short, the `add` and `remove` allow you to add and remove "child subscriptions" to a parent. Child subscriptions are automatically unsubscribed when the parent is unsubscribed. If a child subscription is unsubscribed undependently from the parent, it is removed from the parent automatically to free up memory.

```ts
const mySource$ = new Observable((subscriber) => {
// This inner subscription to `originalSource$` will
// be unsubscribed during teardown.
subscriber.add(
originalSource$.subscribe({
next: (value) => subscriber.next(value + value),
error: (err) => subscriber.error(err),
complete: () => subscriber.complete(),
})
);

subscriber.add(() => {
// this will be called during teardown
});

return () => {
// This is also technically added to
// the subscriber with `subscriber.add`
// immediately after being returned.
};
});
```

---

<a rel="license" href="http://creativecommons.org/licenses/by/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://licensebuttons.net/l/by/4.0/80x15.png" /></a>
This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by/4.0/">Creative Commons Attribution 4.0 International License</a>
This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by/4.0/">Creative Commons Attribution 4.0 International License</a>

0 comments on commit ef66a0c

Please sign in to comment.