diff --git a/src/utils/atomWithObservable.ts b/src/utils/atomWithObservable.ts index da9ac97d46..4d3896b036 100644 --- a/src/utils/atomWithObservable.ts +++ b/src/utils/atomWithObservable.ts @@ -56,24 +56,76 @@ export function atomWithObservable( observable = itself } - const dataAtom = atom( - options?.initialValue - ? getInitialValue(options) - : firstValueFrom(observable) - ) - let setData: (data: TData | Promise) => void = () => { - throw new Error('setting data without mount') + // To differentiate beetwen no value was emitted and `undefined` was emitted, + // this symbol is used + const EMPTY = Symbol() + + let resolveEmittedInitialValue: + | ((data: TData | Promise) => void) + | null = null + let initialEmittedValue: Promise | TData | undefined = + options?.initialValue === undefined + ? new Promise((resolve) => { + resolveEmittedInitialValue = resolve + }) + : undefined + let initialValueWasEmitted = false + + let emittedValueBeforeMount: TData | Promise | typeof EMPTY = EMPTY + let isSync = true + let setData: (data: TData | Promise) => void = (data) => { + // First we set the initial value (if not other initialValue was provided) + // All the following data is saved in a variable so it doesn't get lost before the mount + if (options?.initialValue === undefined && !initialValueWasEmitted) { + if (isSync) { + initialEmittedValue = data + } + resolveEmittedInitialValue?.(data) + initialValueWasEmitted = true + resolveEmittedInitialValue = null + } else { + emittedValueBeforeMount = data + } } + const dataListener = (data: TData) => { setData(data) } + const errorListener = (error: unknown) => { setData(Promise.reject(error)) } + let subscription: Subscription | null = null + let initialValue: + | TData + | Promise + | InitialValueFunction + | undefined + if (options?.initialValue !== undefined) { + initialValue = getInitialValue(options) + } else { + // FIXME + // There is the potential for memory leaks in this implementation. + // + // If the observable doesn't emit an initial value before the component that uses the atom gets destroyed, + // the onMount function never gets called and therefore the subscription never gets cleaned up. + // + // Unfortunately, currently there is no good way to prevent this issue (as of 2022-05-23). + // Timeouts may lead to an endless loading state, if the subscription get's cleaned up too quickly. + // + // Discussion: https://github.com/pmndrs/jotai/pull/1170 + subscription = observable.subscribe(dataListener, errorListener) + initialValue = initialEmittedValue + } + isSync = false + const dataAtom = atom(initialValue) dataAtom.onMount = (update) => { setData = update + if (emittedValueBeforeMount !== EMPTY) { + update(emittedValueBeforeMount) + } if (!subscription) { subscription = observable.subscribe(dataListener, errorListener) } @@ -82,6 +134,7 @@ export function atomWithObservable( subscription = null } } + return { dataAtom, observable } }) const observableAtom = atom( @@ -106,40 +159,3 @@ function getInitialValue(options: AtomWithObservableOptions) { const initialValue = options.initialValue return initialValue instanceof Function ? initialValue() : initialValue } - -// FIXME There are two fatal issues in the current implememtation. -// See also: https://github.com/pmndrs/jotai/pull/1058 -// - There's a risk of memory leaks. -// Unless the source emit a new value, -// the subscription will never be destroyed. -// atom `read` function can be called multiple times without mounting. -// This issue has existed even before #1058. -// - The second value before mounting the atom is dropped. -// There's no guarantee that `onMount` is invoked in a short period. -// So, by the time we invoke `subscribe`, the value can be changed. -// Before #1058, an error was thrown, but currently it's silently dropped. -function firstValueFrom(source: ObservableLike): Promise { - return new Promise((resolve, reject) => { - let resolved = false - let isSync = true - const subscription = source.subscribe({ - next: (value) => { - resolve(value) - resolved = true - if (!isSync) { - subscription.unsubscribe() - } - }, - error: reject, - complete: () => { - reject() - }, - }) - isSync = false - - if (resolved) { - // If subscription was resolved synchronously - subscription.unsubscribe() - } - }) -} diff --git a/tests/utils/atomWithObservable.test.tsx b/tests/utils/atomWithObservable.test.tsx index 274006b5b5..a0ea0f5e05 100644 --- a/tests/utils/atomWithObservable.test.tsx +++ b/tests/utils/atomWithObservable.test.tsx @@ -1,19 +1,41 @@ -import { ReactElement, Suspense, useState } from 'react' -import { act, fireEvent, render } from '@testing-library/react' -import { BehaviorSubject, Observable, ReplaySubject, Subject } from 'rxjs' -import { useAtom } from 'jotai' +import { Component, ReactElement, ReactNode, Suspense, useState } from 'react' +import { act, fireEvent, render, waitFor } from '@testing-library/react' +import { + BehaviorSubject, + Observable, + ReplaySubject, + Subject, + delay, + of, +} from 'rxjs' +import { useAtom, useAtomValue, useSetAtom } from 'jotai' import { atomWithObservable } from 'jotai/utils' import { getTestProvider } from '../testUtils' const Provider = getTestProvider() +class ErrorBoundary extends Component< + { children: ReactNode }, + { error: string } +> { + state = { + error: '', + } + + static getDerivedStateFromError(error: Error) { + return { error: error.message } + } + + render() { + if (this.state.error) { + return
Error: {this.state.error}
+ } + return this.props.children + } +} + it('count state', async () => { - const observableAtom = atomWithObservable( - () => - new Observable((subscriber) => { - subscriber.next(1) - }) - ) + const observableAtom = atomWithObservable(() => of(1)) const Counter = () => { const [state] = useAtom(observableAtom) @@ -33,15 +55,81 @@ it('count state', async () => { }) it('writable count state', async () => { + const subject = new BehaviorSubject(1) + const observableAtom = atomWithObservable(() => subject) + + const Counter = () => { + const [state, dispatch] = useAtom(observableAtom) + + return ( + <> + count: {state} + + + ) + } + + const { findByText, getByText } = render( + + + + + + ) + + await findByText('count: 1') + + act(() => subject.next(2)) + await findByText('count: 2') + + fireEvent.click(getByText('button')) + await findByText('count: 9') + expect(subject.value).toBe(9) + + expect(subject) +}) + +it('writable count state without initial value', async () => { + const subject = new Subject() + const observableAtom = atomWithObservable(() => subject) + + const CounterValue = () => { + const state = useAtomValue(observableAtom) + + return <>count: {state} + } + + const CounterButton = () => { + const dispatch = useSetAtom(observableAtom) + + return + } + + const { findByText, getByText } = render( + + + + + + + + + ) + + await findByText('loading') + + fireEvent.click(getByText('button')) + await findByText('count: 9') + + act(() => subject.next(3)) + await findByText('count: 3') +}) + +it('writable count state with delayed value', async () => { + const subject = new Subject() const observableAtom = atomWithObservable(() => { - const observable = new Observable((subscriber) => { - subscriber.next(1) - }) - const subject = new ReplaySubject() - // is this usual to delay the subscription? - setTimeout(() => { - observable.subscribe(subject) - }, 100) + const observable = of(1).pipe(delay(500)) + observable.subscribe((n) => subject.next(n)) return subject }) @@ -51,7 +139,12 @@ it('writable count state', async () => { return ( <> count: {state} - + ) } @@ -71,6 +164,84 @@ it('writable count state', async () => { await findByText('count: 9') }) +it('only subscribe once per atom', async () => { + const subject = new Subject() + let totalSubscriptions = 0 + const observable = new Observable((subscriber) => { + totalSubscriptions++ + subject.subscribe(subscriber) + }) + const observableAtom = atomWithObservable(() => observable) + + const Counter = () => { + const [state] = useAtom(observableAtom) + + return <>count: {state} + } + + const { findByText, rerender } = render( + + + + + + ) + await findByText('loading') + act(() => subject.next(1)) + await findByText('count: 1') + + rerender(
) + expect(totalSubscriptions).toEqual(1) + + rerender( + + + + + + ) + act(() => subject.next(2)) + await findByText('count: 2') + + expect(totalSubscriptions).toEqual(2) +}) + +it('cleanup subscription', async () => { + const subject = new Subject() + let activeSubscriptions = 0 + const observable = new Observable((subscriber) => { + activeSubscriptions++ + subject.subscribe(subscriber) + return () => { + activeSubscriptions-- + } + }) + const observableAtom = atomWithObservable(() => observable) + + const Counter = () => { + const [state] = useAtom(observableAtom) + + return <>count: {state} + } + + const { findByText, rerender } = render( + + + + + + ) + + await findByText('loading') + + act(() => subject.next(1)) + await findByText('count: 1') + + expect(activeSubscriptions).toEqual(1) + rerender(
) + await waitFor(() => expect(activeSubscriptions).toEqual(0)) +}) + it('resubscribe on remount', async () => { const subject = new Subject() const observableAtom = atomWithObservable(() => subject) @@ -136,20 +307,8 @@ it("count state with initialValue doesn't suspend", async () => { }) it('writable count state with initialValue', async () => { - const observableAtom = atomWithObservable( - () => { - const observable = new Observable((subscriber) => { - subscriber.next(1) - }) - const subject = new Subject() - // is this usual to delay the subscription? - setTimeout(() => { - observable.subscribe(subject) - }, 100) - return subject - }, - { initialValue: 5 } - ) + const subject = new Subject() + const observableAtom = atomWithObservable(() => subject, { initialValue: 5 }) const Counter = () => { const [state, dispatch] = useAtom(observableAtom) @@ -171,22 +330,83 @@ it('writable count state with initialValue', async () => { ) await findByText('count: 5') - + act(() => subject.next(1)) await findByText('count: 1') fireEvent.click(getByText('button')) await findByText('count: 9') }) -it('with initial value and synchronous subscription', async () => { - const observableAtom = atomWithObservable( - () => - new Observable((subscriber) => { - subscriber.next(1) - }), - { initialValue: 5 } +it('writable count state with error', async () => { + const subject = new Subject() + const observableAtom = atomWithObservable(() => subject) + + const Counter = () => { + const [state, dispatch] = useAtom(observableAtom) + + return ( + <> + count: {state} + + + ) + } + + const { findByText } = render( + + + + + + + + ) + + await findByText('loading') + + act(() => subject.error(new Error('Test Error'))) + findByText('Error: Test Error') +}) + +it('synchronous subscription with initial value', async () => { + const observableAtom = atomWithObservable(() => of(1), { initialValue: 5 }) + + const Counter = () => { + const [state] = useAtom(observableAtom) + + return <>count: {state} + } + + const { findByText } = render( + + + + ) + + await findByText('count: 1') +}) + +it('synchronous subscription with BehaviorSubject', async () => { + const observableAtom = atomWithObservable(() => new BehaviorSubject(1)) + + const Counter = () => { + const [state] = useAtom(observableAtom) + + return <>count: {state} + } + + const { findByText } = render( + + + ) + await findByText('count: 1') +}) + +it('synchronous subscription with already emitted value', async () => { + const observableAtom = atomWithObservable(() => of(1)) + const Counter = () => { const [state] = useAtom(observableAtom) @@ -202,9 +422,10 @@ it('with initial value and synchronous subscription', async () => { await findByText('count: 1') }) -it('behaviour subject', async () => { - const subject$ = new BehaviorSubject(1) - const observableAtom = atomWithObservable(() => subject$) +it('with falsy initial value', async () => { + const observableAtom = atomWithObservable(() => new Subject(), { + initialValue: 0, + }) const Counter = () => { const [state] = useAtom(observableAtom) @@ -212,6 +433,25 @@ it('behaviour subject', async () => { return <>count: {state} } + const { findByText } = render( + + + + ) + + await findByText('count: 0') +}) + +it('with initially emitted undefined value', async () => { + const subject = new Subject() + const observableAtom = atomWithObservable(() => subject) + + const Counter = () => { + const [state] = useAtom(observableAtom) + + return <>count: {state === undefined ? '-' : state} + } + const { findByText } = render( @@ -220,5 +460,46 @@ it('behaviour subject', async () => { ) + await findByText('loading') + act(() => subject.next(undefined)) + await findByText('count: -') + act(() => subject.next(1)) await findByText('count: 1') }) + +it("don't omit values emitted between init and mount", async () => { + const subject = new Subject() + const observableAtom = atomWithObservable(() => subject) + + const Counter = () => { + const [state, dispatch] = useAtom(observableAtom) + + return ( + <> + count: {state} + + + ) + } + + const { findByText, getByText } = render( + + + + + + ) + + await findByText('loading') + act(() => subject.next(1)) + act(() => subject.next(2)) + await findByText('count: 2') + + fireEvent.click(getByText('button')) + await findByText('count: 9') +})