Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(utils): Initial data for atomWithObservable #1058

Merged
merged 9 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/utils/atom-with-observable.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Ref: https://github.com/pmndrs/jotai/pull/341
import { useAtom } from 'jotai'
import { atomWithObservable } from 'jotai/utils'
import { interval } from 'rxjs'
import { map } from "rxjs/operators"

const counterSubject = interval(1000).pipe(map((i) => `#${i}`));
const counterAtom = atomWithObservable(() => counterSubject);
Expand All @@ -24,5 +25,14 @@ Its value will be last value emitted from the stream.

To use this atom, you need to wrap your component with `<Suspense>`. Check out [basics/async](../basics/async.mdx).

## Initial value
`atomWithObservable` takes second optional parameter `{ initialValue }` that allows to specify initial value for the atom. If `initialValue` is provided then `atomWithObservable` will not suspend and will show initial value before receiving first value from observable. `initialValue` can be either a value or a function that returns a value

```ts
const counterAtom = atomWithObservable(() => counterSubject, {initialValue: 10});

const counterAtom2 = atomWithObservable(() => counterSubject, {initialValue: () => Math.random()});
```

### Codesandbox
<CodeSandbox id="88pnt" />
92 changes: 52 additions & 40 deletions src/utils/atomWithObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,69 +29,51 @@ type ObservableLike<T> = {

type SubjectLike<T> = ObservableLike<T> & Observer<T>

type InitialValueFunction<T> = () => T | undefined

type AtomWithObservableOptions<TData> = {
initialValue?: TData | InitialValueFunction<TData>
}

export function atomWithObservable<TData>(
createObservable: (get: Getter) => SubjectLike<TData>
createObservable: (get: Getter) => SubjectLike<TData>,
options?: AtomWithObservableOptions<TData>
): WritableAtom<TData, TData>

export function atomWithObservable<TData>(
createObservable: (get: Getter) => ObservableLike<TData>
createObservable: (get: Getter) => ObservableLike<TData>,
options?: AtomWithObservableOptions<TData>
): Atom<TData>

export function atomWithObservable<TData>(
createObservable: (get: Getter) => ObservableLike<TData> | SubjectLike<TData>
createObservable: (get: Getter) => ObservableLike<TData> | SubjectLike<TData>,
options?: AtomWithObservableOptions<TData>
) {
const observableResultAtom = atom((get) => {
let settlePromise: ((data: TData | null, err?: unknown) => void) | null =
null
let observable = createObservable(get)
const itself = observable[Symbol.observable]?.()
if (itself) {
observable = itself
}
const dataAtom = atom<TData | Promise<TData>>(
new Promise<TData>((resolve, reject) => {
settlePromise = (data, err) => {
if (err) {
reject(err)
} else {
resolve(data as TData)
}
}
})

const dataAtom = atom(
options?.initialValue
? getInitialValue(options)
: firstValueFrom(observable)
)
let setData: (data: TData | Promise<TData>) => void = () => {
throw new Error('setting data without mount')
}
const dataListener = (data: TData) => {
if (settlePromise) {
settlePromise(data)
settlePromise = null
if (subscription && !setData) {
subscription.unsubscribe()
subscription = null
}
} else {
setData(data)
}
setData(data)
}
const errorListener = (error: unknown) => {
if (settlePromise) {
settlePromise(null, error)
settlePromise = null
if (subscription && !setData) {
subscription.unsubscribe()
subscription = null
}
} else {
setData(Promise.reject<TData>(error))
}
setData(Promise.reject<TData>(error))
}
let subscription: Subscription | null = null
subscription = observable.subscribe(dataListener, errorListener)
if (!settlePromise) {
subscription.unsubscribe()
subscription = null
}

// FIXME this implementation is not fully compatible with concurrent rendering.
// we need to deal with the case `onMount` is not invoked after the atom is initialized.
dai-shi marked this conversation as resolved.
Show resolved Hide resolved
dataAtom.onMount = (update) => {
setData = update
if (!subscription) {
Expand All @@ -107,6 +89,7 @@ export function atomWithObservable<TData>(
const observableAtom = atom(
(get) => {
const { dataAtom } = get(observableResultAtom)

return get(dataAtom)
},
(get, _set, data: TData) => {
Expand All @@ -120,3 +103,32 @@ export function atomWithObservable<TData>(
)
return observableAtom
}

function getInitialValue<TData>(options: AtomWithObservableOptions<TData>) {
const initialValue = options.initialValue
return initialValue instanceof Function ? initialValue() : initialValue
}

function firstValueFrom<T>(source: ObservableLike<T>): Promise<T> {
return new Promise<T>((resolve, reject) => {
let resolved = false
const subscription = source.subscribe({
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this looks cleaner, I think there's a risk of memory leaks.
Such case is, a) we create an atom, b) before getting the first value, c) atom can be unmounted.
We need to unsubscribe this subscription on onMount cleanup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dai-shi hmm, I thought that the suspended component is not mounted before the promise resolves. So, we don't actually have an option to unsubscribe in any case or do I miss something?

To illustrate, here is an atom that suspends and onMount is never called https://codesandbox.io/s/modern-glade-rutmog?file=/src/App.js:0-413

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right about it. The risk is actually the case onMount is never called. And, it's not covered with the previous impl either...

In jotai/urql code, we have a hacky solution with setTimeout.

let timer: NodeJS.Timeout | null = setTimeout(() => {
timer = null
subscriptionInRender.unsubscribe()
}, 1000)

Given that this isn't ideal, and this is such a rare case anyway. I think I can merge this PR. Maybe, we leave some comments for the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I've already changed other things. Should I do anything else here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment like this:

// FIXME this implementation is not fully compatible with concurrent rendering.
// we need to deal with the case `onMount` is not invoked after the atom is initialized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dai-shi I've added the comment. Please have a look

next: (value) => {
resolve(value)
resolved = true
if (subscription) {
subscription.unsubscribe()
}
},
error: reject,
complete: () => {
reject()
},
})

if (resolved) {
// If subscription was resolved synchronously
subscription.unsubscribe()
}
})
}
90 changes: 90 additions & 0 deletions tests/utils/atomWithObservable.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,93 @@ it('resubscribe on remount', async () => {
act(() => subject.next(2))
await findByText('count: 2')
})

it("count state with initialValue doesn't suspend", async () => {
const subject = new Subject<number>()
const observableAtom = atomWithObservable(() => subject, { initialValue: 5 })

const Counter = () => {
const [state] = useAtom(observableAtom)

return <>count: {state}</>
}

const { findByText } = render(
<Provider>
<Counter />
</Provider>
)

await findByText('count: 5')

act(() => subject.next(10))

await findByText('count: 10')
})

it('writable count state with initialValue', async () => {
const observableAtom = atomWithObservable(
() => {
const observable = new Observable<number>((subscriber) => {
subscriber.next(1)
})
const subject = new Subject<number>()
// is this usual to delay the subscription?
setTimeout(() => {
observable.subscribe(subject)
}, 100)
return subject
},
{ initialValue: 5 }
)

const Counter = () => {
const [state, dispatch] = useAtom(observableAtom)

return (
<>
count: {state}
<button onClick={() => dispatch(9)}>button</button>
</>
)
}

const { findByText, getByText } = render(
<Provider>
<Suspense fallback="loading">
<Counter />
</Suspense>
</Provider>
)

await findByText('count: 5')

await findByText('count: 1')

fireEvent.click(getByText('button'))
await findByText('count: 9')
})

it('with initial value and synchronous subscription', async () => {
const observableAtom = atomWithObservable(
() =>
new Observable<number>((subscriber) => {
subscriber.next(1)
}),
{ initialValue: 5 }
)

const Counter = () => {
const [state] = useAtom(observableAtom)

return <>count: {state}</>
}

const { findByText } = render(
<Provider>
<Counter />
</Provider>
)

await findByText('count: 1')
})