From 2dd50601701289f897452ffa1fdc3aa68d5d4058 Mon Sep 17 00:00:00 2001 From: Tobias Walle Date: Tue, 17 May 2022 20:17:33 +0200 Subject: [PATCH 1/5] fix(utils): fix memory leaks and other issues in atomWithObservable --- docs/utils/atom-with-observable.mdx | 12 + src/utils/atomWithObservable.ts | 109 ++++--- tests/utils/atomWithObservable.test.tsx | 403 +++++++++++++++++++++--- 3 files changed, 436 insertions(+), 88 deletions(-) diff --git a/docs/utils/atom-with-observable.mdx b/docs/utils/atom-with-observable.mdx index d6bacd71a8..fe46df3ae5 100644 --- a/docs/utils/atom-with-observable.mdx +++ b/docs/utils/atom-with-observable.mdx @@ -40,6 +40,18 @@ const counterAtom2 = atomWithObservable(() => counterSubject, { }) ``` +## Timeout + +To prevent memory leaks, the subscription to the observable is cancelled if no initial value was omitted after ten seconds. +You can change this value via the `timeout` option. +You can disable the `timeout` by setting the value to `false`. + +```ts +const counterAtomWithOneSecondTimeout = atomWithObservable(() => counterSubject, {timeout: 1000}); + +const counterAtomWithoutTimeout = atomWithObservable(() => counterSubject, {timeout: false}); +``` + ### Codesandbox diff --git a/src/utils/atomWithObservable.ts b/src/utils/atomWithObservable.ts index da9ac97d46..af499e28b8 100644 --- a/src/utils/atomWithObservable.ts +++ b/src/utils/atomWithObservable.ts @@ -33,6 +33,7 @@ type InitialValueFunction = () => T | undefined type AtomWithObservableOptions = { initialValue?: TData | InitialValueFunction + timeout?: number | false } export function atomWithObservable( @@ -56,24 +57,80 @@ export function atomWithObservable( observable = itself } - const dataAtom = atom( - options?.initialValue - ? getInitialValue(options) - : firstValueFrom(observable) - ) - let setData: (data: TData | Promise) => void = () => { - throw new Error('setting data without mount') + // To differentiate beetwen no value was emitted and `undefined` was emitted, + // this symbol is used + const NotEmitted = Symbol() + type NotEmitted = typeof NotEmitted + + let resolveEmittedInitialValue: + | ((data: TData | Promise) => void) + | null = null + let initialEmittedValue: Promise | TData | undefined = + options?.initialValue === undefined + ? new Promise((resolve) => { + resolveEmittedInitialValue = resolve + }) + : undefined + let initialValueWasEmitted = false + let initialEmittedValueTimer: NodeJS.Timeout | null = null + + let emittedValueBeforeMount: TData | Promise | NotEmitted = + NotEmitted + let isSync = true + let setData: (data: TData | Promise) => void = (data) => { + // First we set the initial value (if not other initialValue was provided) + // All the following data is saved in a variable so it doesn't get lost before the mount + if (options?.initialValue === undefined && !initialValueWasEmitted) { + if (isSync) { + initialEmittedValue = data + } + resolveEmittedInitialValue?.(data) + initialValueWasEmitted = true + resolveEmittedInitialValue = null + if (initialEmittedValueTimer) clearTimeout(initialEmittedValueTimer) + } else { + emittedValueBeforeMount = data + } } + const dataListener = (data: TData) => { setData(data) } + const errorListener = (error: unknown) => { setData(Promise.reject(error)) } + let subscription: Subscription | null = null + let initialValue: + | TData + | Promise + | InitialValueFunction + | undefined + let isMounted = false + if (options?.initialValue !== undefined) { + initialValue = getInitialValue(options) + } else { + subscription = observable.subscribe(dataListener, errorListener) + isSync = false + // Unsubscribe after an timeout, in case the `onMount` method was never called + // and the subscription is still pending + if (options?.timeout !== false) { + initialEmittedValueTimer = setTimeout(() => { + initialEmittedValueTimer = null + if (!isMounted) subscription?.unsubscribe() + }, options?.timeout ?? 10_000) + } + initialValue = initialEmittedValue + } + const dataAtom = atom(initialValue) dataAtom.onMount = (update) => { + isMounted = true setData = update + if (emittedValueBeforeMount !== NotEmitted) { + update(emittedValueBeforeMount) + } if (!subscription) { subscription = observable.subscribe(dataListener, errorListener) } @@ -82,6 +139,7 @@ export function atomWithObservable( subscription = null } } + return { dataAtom, observable } }) const observableAtom = atom( @@ -106,40 +164,3 @@ function getInitialValue(options: AtomWithObservableOptions) { const initialValue = options.initialValue return initialValue instanceof Function ? initialValue() : initialValue } - -// FIXME There are two fatal issues in the current implememtation. -// See also: https://github.com/pmndrs/jotai/pull/1058 -// - There's a risk of memory leaks. -// Unless the source emit a new value, -// the subscription will never be destroyed. -// atom `read` function can be called multiple times without mounting. -// This issue has existed even before #1058. -// - The second value before mounting the atom is dropped. -// There's no guarantee that `onMount` is invoked in a short period. -// So, by the time we invoke `subscribe`, the value can be changed. -// Before #1058, an error was thrown, but currently it's silently dropped. -function firstValueFrom(source: ObservableLike): Promise { - return new Promise((resolve, reject) => { - let resolved = false - let isSync = true - const subscription = source.subscribe({ - next: (value) => { - resolve(value) - resolved = true - if (!isSync) { - subscription.unsubscribe() - } - }, - error: reject, - complete: () => { - reject() - }, - }) - isSync = false - - if (resolved) { - // If subscription was resolved synchronously - subscription.unsubscribe() - } - }) -} diff --git a/tests/utils/atomWithObservable.test.tsx b/tests/utils/atomWithObservable.test.tsx index 274006b5b5..2635a223c0 100644 --- a/tests/utils/atomWithObservable.test.tsx +++ b/tests/utils/atomWithObservable.test.tsx @@ -1,19 +1,41 @@ -import { ReactElement, Suspense, useState } from 'react' -import { act, fireEvent, render } from '@testing-library/react' -import { BehaviorSubject, Observable, ReplaySubject, Subject } from 'rxjs' -import { useAtom } from 'jotai' +import { Component, ReactElement, ReactNode, Suspense, useState } from 'react' +import { act, fireEvent, render, waitFor } from '@testing-library/react' +import { + BehaviorSubject, + Observable, + ReplaySubject, + Subject, + delay, + of, +} from 'rxjs' +import { useAtom, useAtomValue, useSetAtom } from 'jotai' import { atomWithObservable } from 'jotai/utils' import { getTestProvider } from '../testUtils' const Provider = getTestProvider() +class ErrorBoundary extends Component< + { children: ReactNode }, + { error: string } +> { + state = { + error: '', + } + + static getDerivedStateFromError(error: Error) { + return { error: error.message } + } + + render() { + if (this.state.error) { + return
Error: {this.state.error}
+ } + return this.props.children + } +} + it('count state', async () => { - const observableAtom = atomWithObservable( - () => - new Observable((subscriber) => { - subscriber.next(1) - }) - ) + const observableAtom = atomWithObservable(() => of(1)) const Counter = () => { const [state] = useAtom(observableAtom) @@ -33,15 +55,81 @@ it('count state', async () => { }) it('writable count state', async () => { + const subject = new BehaviorSubject(1) + const observableAtom = atomWithObservable(() => subject) + + const Counter = () => { + const [state, dispatch] = useAtom(observableAtom) + + return ( + <> + count: {state} + + + ) + } + + const { findByText, getByText } = render( + + + + + + ) + + await findByText('count: 1') + + act(() => subject.next(2)) + await findByText('count: 2') + + fireEvent.click(getByText('button')) + await findByText('count: 9') + expect(subject.value).toBe(9) + + expect(subject) +}) + +it('writable count state without initial value', async () => { + const subject = new Subject() + const observableAtom = atomWithObservable(() => subject) + + const CounterValue = () => { + const state = useAtomValue(observableAtom) + + return <>count: {state} + } + + const CounterButton = () => { + const dispatch = useSetAtom(observableAtom) + + return + } + + const { findByText, getByText } = render( + + + + + + + + + ) + + await findByText('loading') + + fireEvent.click(getByText('button')) + await findByText('count: 9') + + act(() => subject.next(3)) + await findByText('count: 3') +}) + +it('writable count state with delayed value', async () => { + const subject = new Subject() const observableAtom = atomWithObservable(() => { - const observable = new Observable((subscriber) => { - subscriber.next(1) - }) - const subject = new ReplaySubject() - // is this usual to delay the subscription? - setTimeout(() => { - observable.subscribe(subject) - }, 100) + const observable = of(1).pipe(delay(500)) + observable.subscribe((n) => subject.next(n)) return subject }) @@ -51,7 +139,12 @@ it('writable count state', async () => { return ( <> count: {state} - + ) } @@ -71,6 +164,118 @@ it('writable count state', async () => { await findByText('count: 9') }) +it('only subscribe once per atom', async () => { + const subject = new Subject() + let totalSubscriptions = 0 + const observable = new Observable((subscriber) => { + totalSubscriptions++ + subject.subscribe(subscriber) + }) + const observableAtom = atomWithObservable(() => observable) + + const Counter = () => { + const [state] = useAtom(observableAtom) + + return <>count: {state} + } + + const { findByText, rerender } = render( + + + + + + ) + await findByText('loading') + act(() => subject.next(1)) + await findByText('count: 1') + + rerender(
) + expect(totalSubscriptions).toEqual(1) + + rerender( + + + + + + ) + await findByText('loading') + act(() => subject.next(2)) + await findByText('count: 2') + + expect(totalSubscriptions).toEqual(2) +}) + +it('cleanup subscription', async () => { + const subject = new Subject() + let activeSubscriptions = 0 + const observable = new Observable((subscriber) => { + activeSubscriptions++ + subject.subscribe(subscriber) + return () => { + activeSubscriptions-- + } + }) + const observableAtom = atomWithObservable(() => observable) + + const Counter = () => { + const [state] = useAtom(observableAtom) + + return <>count: {state} + } + + const { findByText, rerender } = render( + + + + + + ) + + await findByText('loading') + + act(() => subject.next(1)) + await findByText('count: 1') + + expect(activeSubscriptions).toEqual(1) + rerender(
) + await waitFor(() => expect(activeSubscriptions).toEqual(0)) +}) + +it('cleanup subscriptions after timeout if initial value never emits', async () => { + const subject = new Subject() + let subscriptions = 0 + const observable = new Observable((subscriber) => { + subscriptions++ + subject.subscribe(subscriber) + return () => { + subscriptions-- + } + }) + const observableAtom = atomWithObservable(() => observable, { timeout: 500 }) + + const Counter = () => { + const [state] = useAtom(observableAtom) + + return <>count: {state} + } + + const { findByText, rerender } = render( + + + + + + ) + + await findByText('loading') + expect(subscriptions).toEqual(1) + + rerender(
) + await waitFor(() => expect(subscriptions).toEqual(0)) +}) + it('resubscribe on remount', async () => { const subject = new Subject() const observableAtom = atomWithObservable(() => subject) @@ -136,20 +341,8 @@ it("count state with initialValue doesn't suspend", async () => { }) it('writable count state with initialValue', async () => { - const observableAtom = atomWithObservable( - () => { - const observable = new Observable((subscriber) => { - subscriber.next(1) - }) - const subject = new Subject() - // is this usual to delay the subscription? - setTimeout(() => { - observable.subscribe(subject) - }, 100) - return subject - }, - { initialValue: 5 } - ) + const subject = new Subject() + const observableAtom = atomWithObservable(() => subject, { initialValue: 5 }) const Counter = () => { const [state, dispatch] = useAtom(observableAtom) @@ -171,22 +364,83 @@ it('writable count state with initialValue', async () => { ) await findByText('count: 5') - + act(() => subject.next(1)) await findByText('count: 1') fireEvent.click(getByText('button')) await findByText('count: 9') }) -it('with initial value and synchronous subscription', async () => { - const observableAtom = atomWithObservable( - () => - new Observable((subscriber) => { - subscriber.next(1) - }), - { initialValue: 5 } +it('writable count state with error', async () => { + const subject = new Subject() + const observableAtom = atomWithObservable(() => subject) + + const Counter = () => { + const [state, dispatch] = useAtom(observableAtom) + + return ( + <> + count: {state} + + + ) + } + + const { findByText } = render( + + + + + + + + ) + + await findByText('loading') + + act(() => subject.error(new Error('Test Error'))) + findByText('Error: Test Error') +}) + +it('synchronous subscription with initial value', async () => { + const observableAtom = atomWithObservable(() => of(1), { initialValue: 5 }) + + const Counter = () => { + const [state] = useAtom(observableAtom) + + return <>count: {state} + } + + const { findByText } = render( + + + + ) + + await findByText('count: 1') +}) + +it('synchronous subscription with BehaviorSubject', async () => { + const observableAtom = atomWithObservable(() => new BehaviorSubject(1)) + + const Counter = () => { + const [state] = useAtom(observableAtom) + + return <>count: {state} + } + + const { findByText } = render( + + + ) + await findByText('count: 1') +}) + +it('synchronous subscription with already emitted value', async () => { + const observableAtom = atomWithObservable(() => of(1)) + const Counter = () => { const [state] = useAtom(observableAtom) @@ -202,9 +456,10 @@ it('with initial value and synchronous subscription', async () => { await findByText('count: 1') }) -it('behaviour subject', async () => { - const subject$ = new BehaviorSubject(1) - const observableAtom = atomWithObservable(() => subject$) +it('with falsy initial value', async () => { + const observableAtom = atomWithObservable(() => new Subject(), { + initialValue: 0, + }) const Counter = () => { const [state] = useAtom(observableAtom) @@ -212,6 +467,25 @@ it('behaviour subject', async () => { return <>count: {state} } + const { findByText } = render( + + + + ) + + await findByText('count: 0') +}) + +it('with initially emitted undefined value', async () => { + const subject = new Subject() + const observableAtom = atomWithObservable(() => subject) + + const Counter = () => { + const [state] = useAtom(observableAtom) + + return <>count: {state === undefined ? '-' : state} + } + const { findByText } = render( @@ -220,5 +494,46 @@ it('behaviour subject', async () => { ) + await findByText('loading') + act(() => subject.next(undefined)) + await findByText('count: -') + act(() => subject.next(1)) await findByText('count: 1') }) + +it("don't omit values emitted between init and mount", async () => { + const subject = new Subject() + const observableAtom = atomWithObservable(() => subject) + + const Counter = () => { + const [state, dispatch] = useAtom(observableAtom) + + return ( + <> + count: {state} + + + ) + } + + const { findByText, getByText } = render( + + + + + + ) + + await findByText('loading') + act(() => subject.next(1)) + act(() => subject.next(2)) + await findByText('count: 2') + + fireEvent.click(getByText('button')) + await findByText('count: 9') +}) From 504599139e5513e56470945571d69dc1c7483b66 Mon Sep 17 00:00:00 2001 From: Tobias Walle Date: Thu, 19 May 2022 17:28:31 +0200 Subject: [PATCH 2/5] remove `timeout` option --- docs/utils/atom-with-observable.mdx | 12 --------- src/utils/atomWithObservable.ts | 13 ---------- tests/utils/atomWithObservable.test.tsx | 33 ------------------------- 3 files changed, 58 deletions(-) diff --git a/docs/utils/atom-with-observable.mdx b/docs/utils/atom-with-observable.mdx index fe46df3ae5..d6bacd71a8 100644 --- a/docs/utils/atom-with-observable.mdx +++ b/docs/utils/atom-with-observable.mdx @@ -40,18 +40,6 @@ const counterAtom2 = atomWithObservable(() => counterSubject, { }) ``` -## Timeout - -To prevent memory leaks, the subscription to the observable is cancelled if no initial value was omitted after ten seconds. -You can change this value via the `timeout` option. -You can disable the `timeout` by setting the value to `false`. - -```ts -const counterAtomWithOneSecondTimeout = atomWithObservable(() => counterSubject, {timeout: 1000}); - -const counterAtomWithoutTimeout = atomWithObservable(() => counterSubject, {timeout: false}); -``` - ### Codesandbox diff --git a/src/utils/atomWithObservable.ts b/src/utils/atomWithObservable.ts index af499e28b8..abd9da82e4 100644 --- a/src/utils/atomWithObservable.ts +++ b/src/utils/atomWithObservable.ts @@ -33,7 +33,6 @@ type InitialValueFunction = () => T | undefined type AtomWithObservableOptions = { initialValue?: TData | InitialValueFunction - timeout?: number | false } export function atomWithObservable( @@ -72,7 +71,6 @@ export function atomWithObservable( }) : undefined let initialValueWasEmitted = false - let initialEmittedValueTimer: NodeJS.Timeout | null = null let emittedValueBeforeMount: TData | Promise | NotEmitted = NotEmitted @@ -87,7 +85,6 @@ export function atomWithObservable( resolveEmittedInitialValue?.(data) initialValueWasEmitted = true resolveEmittedInitialValue = null - if (initialEmittedValueTimer) clearTimeout(initialEmittedValueTimer) } else { emittedValueBeforeMount = data } @@ -107,26 +104,16 @@ export function atomWithObservable( | Promise | InitialValueFunction | undefined - let isMounted = false if (options?.initialValue !== undefined) { initialValue = getInitialValue(options) } else { subscription = observable.subscribe(dataListener, errorListener) isSync = false - // Unsubscribe after an timeout, in case the `onMount` method was never called - // and the subscription is still pending - if (options?.timeout !== false) { - initialEmittedValueTimer = setTimeout(() => { - initialEmittedValueTimer = null - if (!isMounted) subscription?.unsubscribe() - }, options?.timeout ?? 10_000) - } initialValue = initialEmittedValue } const dataAtom = atom(initialValue) dataAtom.onMount = (update) => { - isMounted = true setData = update if (emittedValueBeforeMount !== NotEmitted) { update(emittedValueBeforeMount) diff --git a/tests/utils/atomWithObservable.test.tsx b/tests/utils/atomWithObservable.test.tsx index 2635a223c0..523a78caf0 100644 --- a/tests/utils/atomWithObservable.test.tsx +++ b/tests/utils/atomWithObservable.test.tsx @@ -243,39 +243,6 @@ it('cleanup subscription', async () => { await waitFor(() => expect(activeSubscriptions).toEqual(0)) }) -it('cleanup subscriptions after timeout if initial value never emits', async () => { - const subject = new Subject() - let subscriptions = 0 - const observable = new Observable((subscriber) => { - subscriptions++ - subject.subscribe(subscriber) - return () => { - subscriptions-- - } - }) - const observableAtom = atomWithObservable(() => observable, { timeout: 500 }) - - const Counter = () => { - const [state] = useAtom(observableAtom) - - return <>count: {state} - } - - const { findByText, rerender } = render( - - - - - - ) - - await findByText('loading') - expect(subscriptions).toEqual(1) - - rerender(
) - await waitFor(() => expect(subscriptions).toEqual(0)) -}) - it('resubscribe on remount', async () => { const subject = new Subject() const observableAtom = atomWithObservable(() => subject) From 3abbe505a0d85075844083d0d0fd952df56cce2a Mon Sep 17 00:00:00 2001 From: Tobias Walle Date: Thu, 19 May 2022 17:42:08 +0200 Subject: [PATCH 3/5] fix tests --- tests/utils/atomWithObservable.test.tsx | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/utils/atomWithObservable.test.tsx b/tests/utils/atomWithObservable.test.tsx index 523a78caf0..a0ea0f5e05 100644 --- a/tests/utils/atomWithObservable.test.tsx +++ b/tests/utils/atomWithObservable.test.tsx @@ -200,7 +200,6 @@ it('only subscribe once per atom', async () => { ) - await findByText('loading') act(() => subject.next(2)) await findByText('count: 2') From f8259c4155e5fc0c514b9f0f6906d0c71f06f342 Mon Sep 17 00:00:00 2001 From: Tobias Walle Date: Fri, 20 May 2022 07:34:27 +0200 Subject: [PATCH 4/5] fix review findings --- src/utils/atomWithObservable.ts | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/utils/atomWithObservable.ts b/src/utils/atomWithObservable.ts index abd9da82e4..4021d36f70 100644 --- a/src/utils/atomWithObservable.ts +++ b/src/utils/atomWithObservable.ts @@ -58,8 +58,7 @@ export function atomWithObservable( // To differentiate beetwen no value was emitted and `undefined` was emitted, // this symbol is used - const NotEmitted = Symbol() - type NotEmitted = typeof NotEmitted + const EMPTY = Symbol() let resolveEmittedInitialValue: | ((data: TData | Promise) => void) @@ -72,8 +71,7 @@ export function atomWithObservable( : undefined let initialValueWasEmitted = false - let emittedValueBeforeMount: TData | Promise | NotEmitted = - NotEmitted + let emittedValueBeforeMount: TData | Promise | typeof EMPTY = EMPTY let isSync = true let setData: (data: TData | Promise) => void = (data) => { // First we set the initial value (if not other initialValue was provided) @@ -108,14 +106,14 @@ export function atomWithObservable( initialValue = getInitialValue(options) } else { subscription = observable.subscribe(dataListener, errorListener) - isSync = false initialValue = initialEmittedValue } + isSync = false const dataAtom = atom(initialValue) dataAtom.onMount = (update) => { setData = update - if (emittedValueBeforeMount !== NotEmitted) { + if (emittedValueBeforeMount !== EMPTY) { update(emittedValueBeforeMount) } if (!subscription) { From ed10d98112fa675f5d6dcd7358aa06e49cb135a3 Mon Sep 17 00:00:00 2001 From: Tobias Walle Date: Mon, 23 May 2022 17:52:37 +0200 Subject: [PATCH 5/5] add comment to inform about potential memory leak --- src/utils/atomWithObservable.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/utils/atomWithObservable.ts b/src/utils/atomWithObservable.ts index 4021d36f70..4d3896b036 100644 --- a/src/utils/atomWithObservable.ts +++ b/src/utils/atomWithObservable.ts @@ -105,6 +105,16 @@ export function atomWithObservable( if (options?.initialValue !== undefined) { initialValue = getInitialValue(options) } else { + // FIXME + // There is the potential for memory leaks in this implementation. + // + // If the observable doesn't emit an initial value before the component that uses the atom gets destroyed, + // the onMount function never gets called and therefore the subscription never gets cleaned up. + // + // Unfortunately, currently there is no good way to prevent this issue (as of 2022-05-23). + // Timeouts may lead to an endless loading state, if the subscription get's cleaned up too quickly. + // + // Discussion: https://github.com/pmndrs/jotai/pull/1170 subscription = observable.subscribe(dataListener, errorListener) initialValue = initialEmittedValue }