Skip to content

Commit

Permalink
Merge pull request #5654 from WalletConnect/fix/ws-connection-to-resp…
Browse files Browse the repository at this point in the history
…ect-topics

fix: relayer would only connect if there are topics present in the cl…
  • Loading branch information
ganchoradkov authored Feb 20, 2025
2 parents d30cdee + e44dd89 commit 23cd671
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 33 deletions.
43 changes: 30 additions & 13 deletions packages/core/src/controllers/relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ export class Relayer extends IRelayer {
private heartBeatTimeout = toMiliseconds(THIRTY_SECONDS + FIVE_SECONDS);
private reconnectTimeout: NodeJS.Timeout | undefined;
private connectPromise: Promise<void> | undefined;
private reconnectInProgress = false;
private requestsInFlight: string[] = [];
private connectTimeout = toMiliseconds(ONE_SECOND * 15);
constructor(opts: RelayerOptions) {
Expand Down Expand Up @@ -126,8 +127,7 @@ export class Relayer extends IRelayer {
this.registerEventListeners();
await Promise.all([this.messages.init(), this.subscriber.init()]);
this.initialized = true;
// @ts-expect-error - .cached is private
if (this.subscriber.cached.length > 0) {
if (this.subscriber.hasAnyTopics) {
try {
await this.transportOpen();
} catch (e) {
Expand All @@ -142,12 +142,16 @@ export class Relayer extends IRelayer {

get connected() {
// @ts-expect-error
return this.provider?.connection?.socket?.readyState === 1 ?? false;
return this.provider?.connection?.socket?.readyState === 1 || false;
}

get connecting() {
// @ts-expect-error
return this.provider?.connection?.socket?.readyState === 0 ?? false;
return (
// @ts-expect-error
this.provider?.connection?.socket?.readyState === 0 ||
this.connectPromise !== undefined ||
false
);
}

public async publish(topic: string, message: string, opts?: RelayerTypes.PublishOptions) {
Expand Down Expand Up @@ -268,6 +272,13 @@ export class Relayer extends IRelayer {
}

async transportOpen(relayUrl?: string) {
if (!this.subscriber.hasAnyTopics) {
this.logger.warn(
"Starting WS connection skipped because the client has no topics to work with.",
);
return;
}

if (this.connectPromise) {
this.logger.debug({}, `Waiting for existing connection attempt to resolve...`);
await this.connectPromise;
Expand Down Expand Up @@ -347,9 +358,11 @@ export class Relayer extends IRelayer {
this.connectionAttemptInProgress = true;
this.transportExplicitlyClosed = false;
let attempt = 1;

while (attempt < 6) {
try {
if (this.transportExplicitlyClosed) {
break;
}
this.logger.debug({}, `Connecting to ${this.relayUrl}, attempt: ${attempt}...`);
// Always create new socket instance when trying to connect because if the socket was dropped due to `socket hang up` exception
// It wont be able to reconnect
Expand All @@ -374,7 +387,6 @@ export class Relayer extends IRelayer {
.finally(() => {
this.provider.off(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect);
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = undefined;
});
await new Promise(async (resolve, reject) => {
const onDisconnect = () => {
Expand Down Expand Up @@ -558,7 +570,7 @@ export class Relayer extends IRelayer {
};

private onProviderErrorHandler = (error: Error) => {
this.logger.fatal(error, `Fatal socket error: ${(error as Error)?.message}`);
this.logger.fatal(`Fatal socket error: ${error.message}`);
this.events.emit(RELAYER_EVENTS.error, error);
// close the transport when a fatal error is received as there's no way to recover from it
// usual cases are missing/invalid projectId, expired jwt token, invalid origin etc
Expand Down Expand Up @@ -602,18 +614,23 @@ export class Relayer extends IRelayer {
}

private async onProviderDisconnect() {
await this.subscriber.stop();
clearTimeout(this.pingTimeout);
this.events.emit(RELAYER_EVENTS.disconnect);
this.connectionAttemptInProgress = false;
if (this.reconnectInProgress) return;

this.reconnectInProgress = true;
await this.subscriber.stop();

if (!this.subscriber.hasAnyTopics) return;
if (this.transportExplicitlyClosed) return;
if (this.reconnectTimeout) return;
if (this.connectPromise) return;

this.reconnectTimeout = setTimeout(async () => {
clearTimeout(this.reconnectTimeout);
await this.transportOpen().catch((error) =>
this.logger.error(error, (error as Error)?.message),
);
this.reconnectTimeout = undefined;
this.reconnectInProgress = false;
}, toMiliseconds(RELAYER_RECONNECT_TIMEOUT));
}

Expand All @@ -627,6 +644,6 @@ export class Relayer extends IRelayer {
private async toEstablishConnection() {
await this.confirmOnlineStateOrThrow();
if (this.connected) return;
await this.transportOpen();
await this.connect();
}
}
19 changes: 15 additions & 4 deletions packages/core/src/controllers/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ export class Subscriber extends ISubscriber {
return this.topicMap.topics;
}

get hasAnyTopics() {
return (
this.topicMap.topics.length > 0 ||
this.pending.size > 0 ||
this.cached.length > 0 ||
this.subscriptions.size > 0
);
}

public subscribe: ISubscriber["subscribe"] = async (topic, opts) => {
this.isInitialized();
this.logger.debug(`Subscribing Topic`);
Expand All @@ -114,7 +123,6 @@ export class Subscriber extends ISubscriber {
};

public unsubscribe: ISubscriber["unsubscribe"] = async (topic, opts) => {
await this.restartToComplete();
this.isInitialized();
if (typeof opts?.id !== "undefined") {
await this.unsubscribeById(topic, opts.id, opts);
Expand Down Expand Up @@ -206,8 +214,10 @@ export class Subscriber extends ISubscriber {
private async unsubscribeById(topic: string, id: string, opts?: RelayerTypes.UnsubscribeOptions) {
this.logger.debug(`Unsubscribing Topic`);
this.logger.trace({ type: "method", method: "unsubscribe", params: { topic, id, opts } });

try {
const relay = getRelayProtocolName(opts);
await this.restartToComplete({ topic, id, relay });
await this.rpcUnsubscribe(topic, id, relay);
const reason = getSdkError("USER_DISCONNECTED", `${this.name}, ${topic}`);
await this.onUnsubscribe(topic, id, reason);
Expand All @@ -225,8 +235,8 @@ export class Subscriber extends ISubscriber {
relay: RelayerTypes.ProtocolOptions,
opts?: RelayerTypes.SubscribeOptions,
) {
if (opts?.transportType === TRANSPORT_TYPES.relay) {
await this.restartToComplete();
if (!opts || opts?.transportType === TRANSPORT_TYPES.relay) {
await this.restartToComplete({ topic, id: topic, relay });
}
const api = getRelayProtocolApi(relay.protocol);
const request: RequestArguments<RelayJsonRpc.SubscribeParams> = {
Expand Down Expand Up @@ -559,8 +569,9 @@ export class Subscriber extends ISubscriber {
}
}

private async restartToComplete() {
private async restartToComplete(subscription: SubscriberTypes.Active) {
if (!this.relayer.connected && !this.relayer.connecting) {
this.cached.push(subscription);
await this.relayer.transportOpen();
}
}
Expand Down
3 changes: 2 additions & 1 deletion packages/core/test/expirer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
CORE_DEFAULT,
} from "../src";
import { disconnectSocket, TEST_CORE_OPTIONS } from "./shared";
import { generateRandomBytes32 } from "../../utils/src";

describe("Expirer", () => {
const logger = pino(getDefaultLoggerOptions({ level: CORE_DEFAULT.logger }));
Expand All @@ -37,7 +38,7 @@ describe("Expirer", () => {
it("should expire payload", async () => {
const core = new Core(TEST_CORE_OPTIONS);
await core.start();
await core.relayer.transportOpen();
await core.relayer.subscribe(generateRandomBytes32());
// confirm the expirer is empty
expect(core.expirer.length).to.eq(0);
// set a payload
Expand Down
65 changes: 54 additions & 11 deletions packages/core/test/relayer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ import * as utils from "@walletconnect/utils";
describe("Relayer", () => {
const logger = pino(getDefaultLoggerOptions({ level: CORE_DEFAULT.logger }));

let core;
let relayer;
let core: ICore;
let relayer: IRelayer;
const randomTopic = generateRandomBytes32();

describe("init", () => {
let initSpy: Sinon.SinonSpy;
Expand All @@ -39,13 +40,15 @@ describe("Relayer", () => {
core = new Core(TEST_CORE_OPTIONS);
relayer = core.relayer;
await core.start();
relayer.subscriber.topicMap.set(randomTopic, randomTopic);
});
afterEach(async () => {
await disconnectSocket(relayer);
});

it("should not throw unhandled on network disconnect when there is no provider instance", async () => {
relayer.messages.init = initSpy;
relayer.subscriber.topicMap.clear();
await relayer.init();
expect(relayer.provider).to.be.empty;
expect(relayer.connected).to.be.false;
Expand Down Expand Up @@ -208,6 +211,7 @@ describe("Relayer", () => {
core = new Core(TEST_CORE_OPTIONS);
relayer = core.relayer;
await core.start();
relayer.subscriber.topicMap.set(randomTopic, randomTopic);
await relayer.transportOpen();
});
it("calls `subscriber.unsubscribe` with the passed topic", async () => {
Expand Down Expand Up @@ -269,6 +273,11 @@ describe("Relayer", () => {
core = new Core(TEST_CORE_OPTIONS);
relayer = core.relayer;
await core.start();
relayer.subscriber.subscriptions.set(randomTopic, {
topic: randomTopic,
id: randomTopic,
relay: { protocol: "irn" },
});
});
it("should restart transport after connection drop", async () => {
const randomSessionIdentifier = relayer.core.crypto.randomSessionIdentifier;
Expand Down Expand Up @@ -343,6 +352,11 @@ describe("Relayer", () => {
projectId: TEST_CORE_OPTIONS.projectId,
});
await relayer.init();
relayer.subscriber.subscriptions.set(randomTopic, {
topic: randomTopic,
id: randomTopic,
relay: { protocol: "irn" },
});
await relayer.transportOpen();
const wsConnection = relayer.provider.connection as unknown as WebSocket;
expect(relayer.connected).to.be.true;
Expand Down Expand Up @@ -370,7 +384,7 @@ describe("Relayer", () => {
});

await relayer.init();
await relayer.transportOpen();
await relayer.subscribe(randomTopic);

// @ts-expect-error - accessing private property for testing
const wsUrl = relayer.provider.connection.url;
Expand All @@ -391,7 +405,7 @@ describe("Relayer", () => {
});

await relayer.init();
await relayer.transportOpen();
await relayer.subscribe(randomTopic);

// @ts-expect-error - accessing private property for testing
const wsUrl = relayer.provider.connection.url;
Expand All @@ -412,12 +426,21 @@ describe("Relayer", () => {
});

await relayer.init();
await relayer.transportOpen();
relayer.provider.on(RELAYER_PROVIDER_EVENTS.payload, (payload) => {
expect(payload.error.message).to.include("Unauthorized: origin not allowed");

relayer.subscriber.subscriptions.set(randomTopic, {
topic: randomTopic,
id: randomTopic,
relay: { protocol: "irn" },
});

let errorReceived = false;
relayer.on(RELAYER_EVENTS.error, (payload) => {
expect(payload.message).to.include("Unauthorized: origin not allowed");
errorReceived = true;
});
await relayer.transportOpen().catch((e) => {});
await throttle(1000);
expect(errorReceived).to.be.true;
});

it("[iOS] bundleId included in Cloud Settings - should connect", async () => {
Expand All @@ -433,7 +456,7 @@ describe("Relayer", () => {
});

await relayer.init();
await relayer.transportOpen();
await relayer.subscribe(randomTopic);

// @ts-expect-error - accessing private property for testing
const wsUrl = relayer.provider.connection.url;
Expand All @@ -453,6 +476,11 @@ describe("Relayer", () => {
});

await relayer.init();
relayer.subscriber.subscriptions.set(randomTopic, {
topic: randomTopic,
id: randomTopic,
relay: { protocol: "irn" },
});
await relayer.transportOpen();

// @ts-expect-error - accessing private property for testing
Expand All @@ -474,12 +502,22 @@ describe("Relayer", () => {
});

await relayer.init();
await relayer.transportOpen();
relayer.provider.on(RELAYER_PROVIDER_EVENTS.payload, (payload) => {
expect(payload.error.message).to.include("Unauthorized: origin not allowed");
relayer.subscriber.subscriptions.set(randomTopic, {
topic: randomTopic,
id: randomTopic,
relay: { protocol: "irn" },
});

let errorReceived = false;
relayer.on(RELAYER_EVENTS.error, (payload) => {
expect(payload.message).to.include("Unauthorized: origin not allowed");
errorReceived = true;
});

await relayer.transportOpen().catch((e) => {});

await throttle(1000);
expect(errorReceived).to.be.true;
});

it("[Web] packageName and bundleId not set - should connect", async () => {
Expand All @@ -495,6 +533,11 @@ describe("Relayer", () => {
});

await relayer.init();
relayer.subscriber.subscriptions.set(randomTopic, {
topic: randomTopic,
id: randomTopic,
relay: { protocol: "irn" },
});
await relayer.transportOpen();

// @ts-expect-error - accessing private property for testing
Expand Down
Loading

0 comments on commit 23cd671

Please sign in to comment.