Skip to content

Commit

Permalink
Avoid using separate shadowObservers Set for cleanup observers.
Browse files Browse the repository at this point in the history
When there are multiple observers subscribed to a Concast, messages should
be delivered in the order the observers were added.

The shadowObservers approach notified any cleanup observers after any
normal observers, potentially reordereding message delivery. I'm not sure
that reordering was necessarily "wrong," but it would be a very tricky
problem to track down for anyone who might be affected by it.

However, if any other non-cleanup observers have been added, and the last
observer removed is a cleanup observer, then we should unsubscribe from
this.sub, to preserve the fix for apollographql#6985.
  • Loading branch information
benjamn committed Oct 14, 2020
1 parent c6d58db commit b71a6b8
Showing 1 changed file with 19 additions and 16 deletions.
35 changes: 19 additions & 16 deletions src/utilities/observables/Concast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ export class Concast<T> extends Observable<T> {
// recent message, though possibly at different times in the past.
private observers = new Set<Observer<T>>();

// Shadow observers awaiting broadcast messages. These observers are
// subordinated to this.observers and should not prevent unsubscribing
// from this.sub.
private shadowObservers = new Set<Observer<T>>();

// This property starts off undefined to indicate the initial
// subscription has not yet begun, then points to each source
// subscription in turn, and finally becomes null after the sources have
Expand Down Expand Up @@ -100,7 +95,7 @@ export class Concast<T> extends Observable<T> {
this.handlers.complete!();
}

private deliverLastMessage (observer: Observer<T>) {
private deliverLastMessage(observer: Observer<T>) {
if (this.latest) {
const nextOrError = this.latest[0];
const method = observer[nextOrError];
Expand All @@ -118,12 +113,16 @@ export class Concast<T> extends Observable<T> {
}
}

// Note: cleanup observers do not count towards this total.
private addCount = 0;

public addObserver(observer: Observer<T>) {
if (!this.observers.has(observer)) {
// Immediately deliver the most recent message, so we can always
// be sure all observers have the latest information.
this.deliverLastMessage(observer);
this.observers.add(observer);
++this.addCount;
}
}

Expand All @@ -136,11 +135,11 @@ export class Concast<T> extends Observable<T> {
if (quietly) return;
if (this.sub) {
this.sub.unsubscribe();
this.sub = null;
// In case anyone happens to be listening to this.promise, after
// this.observers has become empty.
this.reject(new Error("Observable cancelled prematurely"));
}
this.sub = null;
}
}

Expand All @@ -165,7 +164,6 @@ export class Concast<T> extends Observable<T> {
if (this.sub !== null) {
this.latest = ["next", result];
iterateObserversSafely(this.observers, "next", result);
iterateObserversSafely(this.shadowObservers, "next", result);
}
},

Expand All @@ -176,7 +174,6 @@ export class Concast<T> extends Observable<T> {
this.latest = ["error", error];
this.reject(error);
iterateObserversSafely(this.observers, "error", error);
iterateObserversSafely(this.shadowObservers, "error", error);
}
},

Expand All @@ -198,7 +195,6 @@ export class Concast<T> extends Observable<T> {
// 'next' message (unless there was an error) immediately
// followed by a 'complete' message (see addObserver).
iterateObserversSafely(this.observers, "complete");
iterateObserversSafely(this.shadowObservers, "complete");
} else if (isPromiseLike(value)) {
value.then(obs => this.sub = obs.subscribe(this.handlers));
} else {
Expand All @@ -213,7 +209,13 @@ export class Concast<T> extends Observable<T> {
const once = () => {
if (!called) {
called = true;
this.shadowObservers.delete(observer);
// If there have been no other (non-cleanup) observers added, pass
// true for the quietly argument, so the removal of the cleanup
// observer does not call this.sub.unsubscribe. If a cleanup
// observer is added and removed before any other observers
// subscribe, we do not want to prevent other observers from
// subscribing later.
this.removeObserver(observer, !this.addCount);
callback();
}
}
Expand All @@ -222,11 +224,12 @@ export class Concast<T> extends Observable<T> {
error: once,
complete: once,
};

// Immediately deliver the most recent message, so we can always
// be sure all observers have the latest information.
this.deliverLastMessage(observer);
this.shadowObservers.add(observer);
const count = this.addCount;
this.addObserver(observer);
// Normally addObserver increments this.addCount, but we can "hide"
// cleanup observers by restoring this.addCount to its previous value
// after adding any cleanup observer.
this.addCount = count;
}

// A public way to abort observation and broadcast.
Expand Down

0 comments on commit b71a6b8

Please sign in to comment.