diff --git a/rpc/Connection.ts b/rpc/Connection.ts index 18a846ca2..181c8db82 100644 --- a/rpc/Connection.ts +++ b/rpc/Connection.ts @@ -51,10 +51,14 @@ export abstract class Connection { protected abstract close(): void - callResultPendings: Record> = {} async call(method: string, params: unknown[]) { - await this.ready() const id = this.nextId++ + return this.#call(id, method, params) + } + + callResultPendings: Record> = {} + async #call(id: number, method: string, params: unknown[]) { + await this.ready() const pending = deferred() this.callResultPendings[id] = pending this.send(id, method, params) @@ -62,6 +66,7 @@ export abstract class Connection { } subscriptionHandlers: Record = {} + subscriptionPendingInits: Record void> = {} async subscription( subscribe: string, unsubscribe: string, @@ -69,21 +74,33 @@ export abstract class Connection { handler: RpcSubscriptionHandler, signal: AbortSignal, ) { - const message = await this.call(subscribe, params) - if (signal.aborted) return - if (message.error) return handler(message) - const subscriptionId = message.result as string - this.subscriptionHandlers[subscriptionId] = handler - signal.addEventListener("abort", () => { - delete this.subscriptionHandlers[subscriptionId] - this.send(this.nextId++, unsubscribe, [subscriptionId]) - }) + const id = this.nextId++ + this.subscriptionPendingInits[id] = (subscriptionId) => { + delete this.subscriptionPendingInits[id] + if (signal.aborted) return + signal.addEventListener("abort", () => { + delete this.subscriptionHandlers[subscriptionId] + this.send(this.nextId++, unsubscribe, [subscriptionId]) + }) + this.subscriptionHandlers[subscriptionId] = handler + } + const message = await this.#call(id, subscribe, params) + if (signal.aborted) { + delete this.subscriptionPendingInits[id] + return + } + if (message.error) { + delete this.subscriptionPendingInits[id] + return handler(message) + } } handle(message: RpcIngressMessage) { if (typeof message.id === "number") { this.callResultPendings[message.id]?.resolve(message) delete this.callResultPendings[message.id] + const init = this.subscriptionPendingInits[message.id] + if (!message.error && init) init(message.result as string) } else if (message.params) { this.subscriptionHandlers[message.params.subscription]?.(message) } else { diff --git a/rpc/smoldot.test.ts b/rpc/smoldot.test.ts new file mode 100644 index 000000000..0f18b061a --- /dev/null +++ b/rpc/smoldot.test.ts @@ -0,0 +1,79 @@ +import { AddChainError } from "../deps/smoldot.ts" +import { deferred } from "../deps/std/async.ts" +import { assertEquals, assertRejects } from "../deps/std/testing/asserts.ts" +import { RpcSubscriptionMessage } from "./rpc_messages.ts" +import { SmoldotConnection } from "./smoldot.ts" + +Deno.test({ + name: "Smoldot", + sanitizeOps: false, + sanitizeResources: false, + async fn(t) { + await t.step("relay chain connection", async () => { + const relayChainSpec = await fetchText( + "https://raw.githubusercontent.com/paritytech/substrate-connect/main/packages/connect/src/connector/specs/polkadot.json", + ) + const connection = new SmoldotConnection({ relayChainSpec }) + await connection.ready() + const controller = new AbortController() + const pendingMessage = deferred() + connection.subscription( + "chainHead_unstable_follow", + "chainHead_unstable_unfollow", + [false], + (message) => { + controller.abort() + pendingMessage.resolve(message) + }, + controller.signal, + ) + const message = await pendingMessage + assertEquals((await message.params?.result as any).event, "initialized") + }) + + await t.step("parachain connection", async () => { + const relayChainSpec = await fetchText( + "https://raw.githubusercontent.com/paritytech/substrate-connect/main/packages/connect/src/connector/specs/westend2.json", + ) + const parachainSpec = await fetchText( + "https://raw.githubusercontent.com/paritytech/substrate-connect/main/projects/demo/src/assets/westend-westmint.json", + ) + const connection = new SmoldotConnection({ + relayChainSpec, + parachainSpec, + }) + await connection.ready() + const controller = new AbortController() + const pendingMessage = deferred() + connection.subscription( + "chainHead_unstable_follow", + "chainHead_unstable_unfollow", + [false], + (message) => { + controller.abort() + pendingMessage.resolve(message) + }, + controller.signal, + ) + const message = await pendingMessage + assertEquals((await message.params?.result as any).event, "initialized") + }) + + await t.step( + "invalid chain spec", + async () => { + await assertRejects( + async () => { + const connection = new SmoldotConnection({ relayChainSpec: "" }) + return connection.smoldotChainPending + }, + AddChainError, + ) + }, + ) + }, +}) + +async function fetchText(url: string) { + return (await fetch(url)).text() +}