-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Using AbortSignal and AbortController #3122
Comments
Also cc @domenic who might have interest. |
So before, |
@felixfbecker No, not at all. It doesn't change anything semantically for the Observable type, just the cancellation mechanism. In the short term, it would be an additional feature rather than a replacement. |
I could see that as a confusion point because that is what the Promise returned by |
I'm not sure we could do that, it would make error handling pretty much awful, as any unsubscription would be treated like an error. In fetch it makes a little sense, because Promises are always multicast... however cancellation is not an error state. Seems like, if anything, a finally clause would need to be notified in a Promise, but that's a matter for debate elsewhere. I just know with Observable it seems like a really bad idea. |
After talking with @jhusain about this last night, I think we're going to add an
|
I'm not so sure about this. Does this mean document.body.on("mousemove").takeUntil(document.body.on("click")).forEach(e => { ... }); would result in a (here-unhandled) rejected promise? It feels OK to me to use the AbortSignal mechanism to communicate cancelations without using "AbortError". |
@domenic A trickier one would be |
Ah, right, in that case I don't have any concerns, especially since I've never used/don't understand switchMap :). |
Well, OK, no, maybe I have concerns. It's weird that the equivalent of removeEventListener() would cause an "exception". This isn't exceptional. |
Maybe I need to disambiguate this a little.
In the case of using In the case of using |
I don't think that is the right dimension along which to mirror fetch. In fetch, canceling is at least somewhat exceptional; it prevents a response from being received. Removing an event listener is not exceptional. |
@domenic Interesting, so you're saying you would not reject the promise returned by |
Yeah, I don't think I would reject the promise. A function that accepts an AbortSignal can decide how to react to the news of the process being aborted. If it means premature, somewhat-exceptional termination, it can return a rejected promise. If it's expected termination, and this is just one signalling channel, it can return a fulfilled promise. It's up to the function in question. For forEach, it seems expected. Now, if the observable was created via something like const o = new Observable((observer, signal) => {
signal.addEventListener("abort", () => {
observer.error(new DOMException("AbortError", "..."));
});
}); then |
@domenic ... I see. So you're saying leave it to the Observable creator to decide whether or not to send the AbortSignal? That makes sense. What about for the case of const o = new Observable((observer, signal) => {
signal.on('abort').forEach(() => {
observer.error(new DOMException('AbortError', '')
}, otherSignal);
}); ... if |
Assumine you mean |
@domenic I meant in a world where EventTarget has an I think I'm totally with you that the Observable creator should be left to decide whether or not an abort signals an error. |
Right, I don't think observables created by on() should be ones that treat unsubscription/aborting as an error. |
Observable.prototype.switchMap = function(sel: (x: any) => Observable) {
let source = this;
return new Observable((sink) => {
let outerSubscription = new Subscription();
let innerSubscription = new Subscription();
outerSubscription.add(source.subscribe((x) => {
innerSubscription.unsubscribe();
outerSubscription.remove(innerSubscription);
outerSubscription.add(innerSubscription = sel(x).subscribe({
next(x) { sink.next(x); },
error: sink.error.bind(sink),
complete: sink.complete.bind(sink)
}));
}, sink.error.bind(sink), sink.complete.bind(obs)));
return outerSubscription;
});
} It's useful in cases you're mapping discrete events into asynchronous actions, and the arrival of a new event should cancel the current pending action, and switch to the latest asynchronous action: tabNavigatorSelections
.switchMap((selectedTab) => loadTabContent(selectedTab))
.subscribe((selectedTabContent) => render(selectedTabContent));
mouseDownEvents
.switchMap((down) => mouseMoveEvents.takeUntil(mouseUp))
.subscribe((moveEvent) => doDrag(moveEvent)) |
I'm currently thinking that in a world where observable is using AbortSignal internally, we might want to provide an AbortSignal to flattening functions. Use case: clicks.switchMap((e, i, signal) => fetch(url, { signal })) 🤔 Then again I'd worry a little bit that people would use it for silly things inside of that function. |
FWIW people can do it like this: switchMap(
() =>
new Observable(observer => {
const abortController = new AbortController()
fetch(url, { signal: abortController.signal })
.then(res => {
observer.next(res)
observer.complete()
})
.catch(err => observer.error(err))
return () => abortController.abort()
})
) |
@felixfbecker in a world where we were using AbortSignal as well, it would be easier than that, as we'd be providing a switchMap(
() =>
new Observable((observer, signal) => {
fetch(url, { signal })
.then(res => {
observer.next(res)
observer.complete()
})
.catch(err => observer.error(err))
})
) I'm just trying to think of ways to make that more ergonomic. I'm tired of everyone maintaining an HTTP client. |
After looking starting to implement a toy version of this, I think I've decided that it would not work out. The problem was clear after some discussion with @jhusain. The reasons it won't work are a bit nuanced:
The truth is that streams like Observables are made to be subscribed to and unsubscribed from, and it's not an error if you unsubscribe, it's expected. So while As of right now, I think I'll close this issue. 😢 I'm still interested in reading anyone's thoughts, and I'm happy to reopen if necessary. |
So, their behavior is completely up to us. They only reject forEach because we decided that. And we only decided that because @jhusain convinced me nobody would use forEach for subscription, essentially. If we think people will use forEach for a subscription where they don't care about the completion value, we should not reject with AbortError. |
@domenic I thought the nasty issue with not rejecting the promise returned by forEach is ambiguous behavior between "complete" and "unsubscribed" within an async function: async function foo() {
await someObservable.forEach(signal);
doThisWhenComplete();
} ... where some third party to the execution of It sucks. I really want this to work, but currently I'm not seeing a great solution. Hopefully I'm just near-sighted. haha. |
Without completely changing how Observables work, would it still be possible to improve the interoperability for the use case of calling abortable async functions? Taking from ideas proposed in this thread, provide an switchMap((val, i, signal) => fetch(url, { signal }))
defer(signal => fetch(url, { signal }))
new Observable((subscriber, signal) => {
fetch(url, { signal })
.then(res => {
subscriber.next(res)
subscriber.complete()
})
.catch(err => observer.error(err))
}) There would be no change in behaviour - when the Observable is unsubscribed from, it first runs the unsubscribe logic, then also aborts the AbortSignal. So whether the Promise rejects with an AbortError or not doesn't matter anymore because the consumer unsubscribed and will never know. It is nothing but a convenience to make it easier to abort unneeded work. There are lots of use cases where Observables and their "dont care" unsubscribe semantics are the right abstraction (e.g. events), but as part of an Observable chain you often need to trigger async work, and for a simple async function like The primary difference to the proposal in the OP is that this strictly improves the conversion from Promise/AbortSignal -> Observable, not the other way around. I.e. it answers "I have a function that takes AbortSignal, how can I get an Observable", it intentionally does not solve "I have a function that returns an Observable and an AbortSignal, how can I cancel it" because I don't think the use case is as strong for that, and it's not hard to do manually. |
Ping @benlesh, thoughts? |
I wrapped my proposal into a library of functions that can be used as drop-in replacement for RxJS factories/operators where AbortSignal makes sense: https://github.com/felixfbecker/abortable-rx |
@benlesh it would be nice if atleast |
Has this had any movement in the last 3/4 year? I just sat down today to replace all my native |
@thw0rted you can try the helpers I linked above |
Thanks Felix. I'm probably not going to add the whole library but I will compare your approach to what I worked out myself. |
@thw0rted Just curious, have you tried using private abortSignal$$ = new Subject();
public abort() {
this.abortSignal$$.next();
}
public loadData(): Observable<Data[]> {
return this.http.get<Data[]>(`/api/data`).pipe(takeUntil(this.abortSignal$$));
} Perhaps I'm missing a nuance here...it may be that using |
That's a neat idea! I'm an Observables yellow-belt at best, so I would never have come up with that on my own, but it seems like it gets the spirit of the thing. My understanding is that the underlying XHR / fetch call is supposed to be aborted when the last subscriber unsubscribes from the returned observable. So, as long as This also simplifies my code -- I can basically write a tiny wrapper to turn an AbortSignal into a Subject: public load(signal?: AbortSignal) {
const ret = this.http.get<Data[]>(`/api/data`);
if (!signal) { return ret; }
const subj = new Subject();
signal.addEventListener("abort", () => subj.next(), {once: true});
return ret.pipe(takeUntil(subj));
} |
It's a bit more nuanced:
|
Revised per Felix's suggestions, in case anybody finds it helpful: // Like `Observable#toPromise` but unsubscribes when the passed signal aborts
export function observableToPromise<T>(obs: Observable<T>, signal?: AbortSignal): Promise<T> {
if (!signal) { return obs.toPromise(); }
if (signal.aborted) { return NEVER.toPromise(); }
const stop = fromEvent(signal, "abort").pipe(take(1));
return obs.pipe(takeUntil(stop)).toPromise();
} |
Nice. Just one last tip: You probably want to reject the Promise with an The test suite of |
From my point of view, Cancellation is just an Observable with a specific context. It is an action invoked by an entity that represents of canceling something. This definition is pretty much close to what an Observable is (an object when observed emits a data representation). But this idea tells us more about what an Observable is. If cancellation is an Observable in the context of canceling something and some Observables has a need for it. Then it could be that Observable, as requires by a reactive program, fundamentally reacts from another Observable outside. const fromArray = array => new Observable((next, externalObservable) => {
let cancelled = false;
// listen to an external observable that could
// probably emit a cancellation context (token)
// as an indication to break the loop
externalObservable
.filter(value => value === 'CANCEL')
.take(1) // automatic unsubscribe external observable
.subscribe(() => (cancelled = true))
for (let index = 0; index < array.length; index++) {
if (cancelled) break;
next(array[index]);
}
});
const subject = new Subject();
fromArray([1, 2, 3, 4, 5])
.subscribe(
value => value === 3 && subject.next('CANCEL'), // emit a cancellation context
subject // pass an observable that could emit a cancellation context
) Pseudo-code above shows that If the idea that cancellation is just another form of Observable is correct and fundamental new Observable(next => next('CANCEL'));
fromEvent(button, 'click').mapTo('CANCEL');
timeout(1000).mapTo('CANCEL') then it could be that Observable fundamentally reacts to another Observable. This not just limited to cancellation context it could be used for aborting fetch requests, pausing an observable, pulling data manually from an observable, and many more. I've explained the idea further in this article (https://github.com/cedmandocdoc/redefining-observable) and provided workable examples to play with. |
The standards proposals are leaning towards using AbortSignal and AbortController (ala
fetch
cancellation) for cancellation with Observables. I generally find this favorable for a few reasons and I think I see a migration path for us.Proposal
subscribe
acceptsObserver
and an optionalAbortSignal
.forEach
accepts an optionalAbortSignal
.new Observable(subscriber)
gives both a safeObserver
and anAbortSignal
, but is expected to return void.Observer
passed to subscribe will have an optionalabort
handler that will get theAbortError
if the observable is aborted.forEach
, theAbortError
would go down the rejection path just like it does with fetch. Since the returned promise is a guarantee to a future where the producer either errors or completes, it makes sense to send theAbortError
down the rejection path in this case.Advantages
AbortSignal
is an EventTarget, and EventTarget may soon have anon
method that returns an observable. you could useabortSignal.on('abort')
to compose teardown if you so choose.Migration path
We'll start using
AbortSignal
internally throughout the library.subscribe
that accepts anObserver
and anAbortSignal
. IfAbortSignal
is passed, it does not return aSubscription
.Subscription
into anAbortController
AbortSignal
.unsubscribe
asabort
.Observable
constructor receive anAbortSignal
.AbortSignal
.start
method I just added. :) lol It's no longer necessary.abort
method that acceptsAbortError
.forEach
get sent as rejections of typeAbortError
to the returnedPromise
.toPromise
accept an abort signalsubscribe(fn, fn , fn)
, because object literals are more readable anyhow, anforEach
can be used for the general case of just needingnext
. Thinksubscribe(null, (err) => doAThing(err))
vssubscribe({ error: doAThing })
NOTE: The current TC39 proposal may not reflect this yet
cc/ @jhusain @trxcllnt @kwonoj @mattpodwysocki
The text was updated successfully, but these errors were encountered: