From 5aff5e86093ce65c23dd6b98f2518659dc0d6ddc Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Sat, 17 Oct 2015 13:15:34 +0300 Subject: [PATCH] fix(retryWhen): fix internal unsubscriptions Reimplement retryWhen operator. Fix retryWhen to tear down resources (internal retried subscriptions and the subscription to the notifier Observable) whenever the result is unsubscribed. --- src/operators/retryWhen.ts | 101 +++++++++++++++++++++++++++++-------- 1 file changed, 80 insertions(+), 21 deletions(-) diff --git a/src/operators/retryWhen.ts b/src/operators/retryWhen.ts index f67e33f444..2e780a3aa8 100644 --- a/src/operators/retryWhen.ts +++ b/src/operators/retryWhen.ts @@ -14,49 +14,108 @@ export default function retryWhen(notifier: (errors: Observable) => Obse class RetryWhenOperator implements Operator { constructor(protected notifier: (errors: Observable) => Observable, - protected original: Observable) { + protected source: Observable) { } call(subscriber: Subscriber): Subscriber { - return new RetryWhenSubscriber(subscriber, this.notifier, this.original); + return new FirstRetryWhenSubscriber(subscriber, this.notifier, this.source); } } -class RetryWhenSubscriber extends Subscriber { +class FirstRetryWhenSubscriber extends Subscriber { + lastSubscription: Subscription; + notificationSubscription: Subscription; errors: Subject; retryNotifications: Observable; - constructor(destination: Subscriber, + constructor(public destination: Subscriber, public notifier: (errors: Observable) => Observable, - public original: Observable) { - super(destination); + public source: Observable) { + super(null); + this.lastSubscription = this; } - _error(err: any) { - if (!this.retryNotifications) { - this.errors = new Subject(); - const notifications = tryCatch(this.notifier).call(this, this.errors); - if (notifications === errorObject) { - this.destination.error(errorObject.e); - } else { - this.retryNotifications = notifications; - this.add(notifications._subscribe(new RetryNotificationSubscriber(this))); + _next(value: T) { + this.destination.next(value); + } + + error(err?) { + if (!this.isUnsubscribed) { + super.unsubscribe(); + if (!this.retryNotifications) { + this.errors = new Subject(); + const notifications = tryCatch(this.notifier).call(this, this.errors); + if (notifications === errorObject) { + this.destination.error(errorObject.e); + } else { + this.retryNotifications = notifications; + const notificationSubscriber = new RetryNotificationSubscriber(this); + this.notificationSubscription = notifications.subscribe(notificationSubscriber); + } } + this.errors.next(err); } - this.errors.next(err); } - finalError(err: any) { + destinationError(err: any) { + this.tearDown(); this.destination.error(err); } + _complete() { + this.destinationComplete(); + } + + destinationComplete() { + this.tearDown(); + this.destination.complete(); + } + + unsubscribe() { + const lastSubscription = this.lastSubscription; + if (lastSubscription === this) { + super.unsubscribe(); + } else { + this.tearDown(); + } + } + + tearDown() { + super.unsubscribe(); + this.lastSubscription.unsubscribe(); + const notificationSubscription = this.notificationSubscription; + if (notificationSubscription) { + notificationSubscription.unsubscribe(); + } + } + resubscribe() { - this.original.subscribe(this); + this.lastSubscription.unsubscribe(); + const nextSubscriber = new MoreRetryWhenSubscriber(this); + this.lastSubscription = this.source.subscribe(nextSubscriber); + } +} + +class MoreRetryWhenSubscriber extends Subscriber { + constructor(private parent: FirstRetryWhenSubscriber) { + super(null); + } + + _next(value: T) { + this.parent.destination.next(value); + } + + _error(err: any) { + this.parent.errors.next(err); + } + + _complete() { + this.parent.destinationComplete(); } } class RetryNotificationSubscriber extends Subscriber { - constructor(public parent: RetryWhenSubscriber) { + constructor(public parent: FirstRetryWhenSubscriber) { super(null); } @@ -65,10 +124,10 @@ class RetryNotificationSubscriber extends Subscriber { } _error(err: any) { - this.parent.finalError(err); + this.parent.destinationError(err); } _complete() { - this.parent.complete(); + this.parent.destinationComplete(); } } \ No newline at end of file