Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: relayer would only connect if there are topics present in the cl… #5654

Merged
merged 12 commits into from
Feb 20, 2025
Merged
43 changes: 30 additions & 13 deletions packages/core/src/controllers/relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ export class Relayer extends IRelayer {
private heartBeatTimeout = toMiliseconds(THIRTY_SECONDS + FIVE_SECONDS);
private reconnectTimeout: NodeJS.Timeout | undefined;
private connectPromise: Promise<void> | undefined;
private reconnectInProgress = false;
private requestsInFlight: string[] = [];
private connectTimeout = toMiliseconds(ONE_SECOND * 15);
constructor(opts: RelayerOptions) {
Expand Down Expand Up @@ -126,8 +127,7 @@ export class Relayer extends IRelayer {
this.registerEventListeners();
await Promise.all([this.messages.init(), this.subscriber.init()]);
this.initialized = true;
// @ts-expect-error - .cached is private
if (this.subscriber.cached.length > 0) {
if (this.subscriber.hasAnyTopics) {
try {
await this.transportOpen();
} catch (e) {
Expand All @@ -142,12 +142,16 @@ export class Relayer extends IRelayer {

get connected() {
// @ts-expect-error
return this.provider?.connection?.socket?.readyState === 1 ?? false;
return this.provider?.connection?.socket?.readyState === 1 || false;
}

get connecting() {
// @ts-expect-error
return this.provider?.connection?.socket?.readyState === 0 ?? false;
return (
// @ts-expect-error
this.provider?.connection?.socket?.readyState === 0 ||
this.connectPromise !== undefined ||
false
);
}

public async publish(topic: string, message: string, opts?: RelayerTypes.PublishOptions) {
Expand Down Expand Up @@ -268,6 +272,13 @@ export class Relayer extends IRelayer {
}

async transportOpen(relayUrl?: string) {
if (!this.subscriber.hasAnyTopics) {
this.logger.warn(
"Starting WS connection skipped because the client has no topics to work with.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be warn level or debug?

Copy link
Member Author

@ganchoradkov ganchoradkov Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be an error and the call should be rejected technically but we want to avoid breaking changes. Warn specifies something is not behaving as expected and I think it fits best. Debug will rarely be used by devs as its quite noisy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. So we don't want folks to interact with this API? Can we test on AppKit just to make sure we don't get loads of warnings now unexpectedly?

);
return;
}

if (this.connectPromise) {
this.logger.debug({}, `Waiting for existing connection attempt to resolve...`);
await this.connectPromise;
Expand Down Expand Up @@ -347,9 +358,11 @@ export class Relayer extends IRelayer {
this.connectionAttemptInProgress = true;
this.transportExplicitlyClosed = false;
let attempt = 1;

while (attempt < 6) {
try {
if (this.transportExplicitlyClosed) {
break;
}
this.logger.debug({}, `Connecting to ${this.relayUrl}, attempt: ${attempt}...`);
// Always create new socket instance when trying to connect because if the socket was dropped due to `socket hang up` exception
// It wont be able to reconnect
Expand All @@ -374,7 +387,6 @@ export class Relayer extends IRelayer {
.finally(() => {
this.provider.off(RELAYER_PROVIDER_EVENTS.disconnect, onDisconnect);
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = undefined;
});
await new Promise(async (resolve, reject) => {
const onDisconnect = () => {
Expand Down Expand Up @@ -558,7 +570,7 @@ export class Relayer extends IRelayer {
};

private onProviderErrorHandler = (error: Error) => {
this.logger.fatal(error, `Fatal socket error: ${(error as Error)?.message}`);
this.logger.fatal(`Fatal socket error: ${error.message}`);
this.events.emit(RELAYER_EVENTS.error, error);
// close the transport when a fatal error is received as there's no way to recover from it
// usual cases are missing/invalid projectId, expired jwt token, invalid origin etc
Expand Down Expand Up @@ -602,18 +614,23 @@ export class Relayer extends IRelayer {
}

private async onProviderDisconnect() {
await this.subscriber.stop();
clearTimeout(this.pingTimeout);
this.events.emit(RELAYER_EVENTS.disconnect);
this.connectionAttemptInProgress = false;
if (this.reconnectInProgress) return;

this.reconnectInProgress = true;
await this.subscriber.stop();

if (!this.subscriber.hasAnyTopics) return;
if (this.transportExplicitlyClosed) return;
if (this.reconnectTimeout) return;
if (this.connectPromise) return;

this.reconnectTimeout = setTimeout(async () => {
clearTimeout(this.reconnectTimeout);
await this.transportOpen().catch((error) =>
this.logger.error(error, (error as Error)?.message),
);
this.reconnectTimeout = undefined;
this.reconnectInProgress = false;
}, toMiliseconds(RELAYER_RECONNECT_TIMEOUT));
}

Expand All @@ -627,6 +644,6 @@ export class Relayer extends IRelayer {
private async toEstablishConnection() {
await this.confirmOnlineStateOrThrow();
if (this.connected) return;
await this.transportOpen();
await this.connect();
}
}
19 changes: 15 additions & 4 deletions packages/core/src/controllers/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@
return this.topicMap.topics;
}

get hasAnyTopics() {
return (
this.topicMap.topics.length > 0 ||
this.pending.size > 0 ||
this.cached.length > 0 ||
this.subscriptions.size > 0
);
}

public subscribe: ISubscriber["subscribe"] = async (topic, opts) => {
this.isInitialized();
this.logger.debug(`Subscribing Topic`);
Expand All @@ -114,7 +123,6 @@
};

public unsubscribe: ISubscriber["unsubscribe"] = async (topic, opts) => {
await this.restartToComplete();
this.isInitialized();
if (typeof opts?.id !== "undefined") {
await this.unsubscribeById(topic, opts.id, opts);
Expand Down Expand Up @@ -206,8 +214,10 @@
private async unsubscribeById(topic: string, id: string, opts?: RelayerTypes.UnsubscribeOptions) {
this.logger.debug(`Unsubscribing Topic`);
this.logger.trace({ type: "method", method: "unsubscribe", params: { topic, id, opts } });

try {
const relay = getRelayProtocolName(opts);
await this.restartToComplete({ topic, id, relay });
await this.rpcUnsubscribe(topic, id, relay);
const reason = getSdkError("USER_DISCONNECTED", `${this.name}, ${topic}`);
await this.onUnsubscribe(topic, id, reason);
Expand All @@ -225,8 +235,8 @@
relay: RelayerTypes.ProtocolOptions,
opts?: RelayerTypes.SubscribeOptions,
) {
if (opts?.transportType === TRANSPORT_TYPES.relay) {
await this.restartToComplete();
if (!opts || opts?.transportType === TRANSPORT_TYPES.relay) {
await this.restartToComplete({ topic, id: topic, relay });
}
const api = getRelayProtocolApi(relay.protocol);
const request: RequestArguments<RelayJsonRpc.SubscribeParams> = {
Expand Down Expand Up @@ -301,7 +311,7 @@
private async rpcBatchSubscribe(subscriptions: SubscriberTypes.Params[]) {
if (!subscriptions.length) return;
const relay = subscriptions[0].relay;
const api = getRelayProtocolApi(relay!.protocol);

Check warning on line 314 in packages/core/src/controllers/subscriber.ts

View workflow job for this annotation

GitHub Actions / code_style (lint)

Forbidden non-null assertion
const request: RequestArguments<RelayJsonRpc.BatchSubscribeParams> = {
method: api.batchSubscribe,
params: {
Expand Down Expand Up @@ -330,7 +340,7 @@
private async rpcBatchFetchMessages(subscriptions: SubscriberTypes.Params[]) {
if (!subscriptions.length) return;
const relay = subscriptions[0].relay;
const api = getRelayProtocolApi(relay!.protocol);

Check warning on line 343 in packages/core/src/controllers/subscriber.ts

View workflow job for this annotation

GitHub Actions / code_style (lint)

Forbidden non-null assertion
const request: RequestArguments<RelayJsonRpc.BatchFetchMessagesParams> = {
method: api.batchFetchMessages,
params: {
Expand Down Expand Up @@ -559,8 +569,9 @@
}
}

private async restartToComplete() {
private async restartToComplete(subscription: SubscriberTypes.Active) {
if (!this.relayer.connected && !this.relayer.connecting) {
this.cached.push(subscription);
await this.relayer.transportOpen();
}
}
Expand Down
3 changes: 2 additions & 1 deletion packages/core/test/expirer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
CORE_DEFAULT,
} from "../src";
import { disconnectSocket, TEST_CORE_OPTIONS } from "./shared";
import { generateRandomBytes32 } from "../../utils/src";

describe("Expirer", () => {
const logger = pino(getDefaultLoggerOptions({ level: CORE_DEFAULT.logger }));
Expand All @@ -37,7 +38,7 @@ describe("Expirer", () => {
it("should expire payload", async () => {
const core = new Core(TEST_CORE_OPTIONS);
await core.start();
await core.relayer.transportOpen();
await core.relayer.subscribe(generateRandomBytes32());
// confirm the expirer is empty
expect(core.expirer.length).to.eq(0);
// set a payload
Expand Down
65 changes: 54 additions & 11 deletions packages/core/test/relayer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ import * as utils from "@walletconnect/utils";
describe("Relayer", () => {
const logger = pino(getDefaultLoggerOptions({ level: CORE_DEFAULT.logger }));

let core;
let relayer;
let core: ICore;
let relayer: IRelayer;
const randomTopic = generateRandomBytes32();

describe("init", () => {
let initSpy: Sinon.SinonSpy;
Expand All @@ -39,13 +40,15 @@ describe("Relayer", () => {
core = new Core(TEST_CORE_OPTIONS);
relayer = core.relayer;
await core.start();
relayer.subscriber.topicMap.set(randomTopic, randomTopic);
});
afterEach(async () => {
await disconnectSocket(relayer);
});

it("should not throw unhandled on network disconnect when there is no provider instance", async () => {
relayer.messages.init = initSpy;
relayer.subscriber.topicMap.clear();
await relayer.init();
expect(relayer.provider).to.be.empty;
expect(relayer.connected).to.be.false;
Expand Down Expand Up @@ -208,6 +211,7 @@ describe("Relayer", () => {
core = new Core(TEST_CORE_OPTIONS);
relayer = core.relayer;
await core.start();
relayer.subscriber.topicMap.set(randomTopic, randomTopic);
await relayer.transportOpen();
});
it("calls `subscriber.unsubscribe` with the passed topic", async () => {
Expand Down Expand Up @@ -269,6 +273,11 @@ describe("Relayer", () => {
core = new Core(TEST_CORE_OPTIONS);
relayer = core.relayer;
await core.start();
relayer.subscriber.subscriptions.set(randomTopic, {
topic: randomTopic,
id: randomTopic,
relay: { protocol: "irn" },
});
});
it("should restart transport after connection drop", async () => {
const randomSessionIdentifier = relayer.core.crypto.randomSessionIdentifier;
Expand Down Expand Up @@ -343,6 +352,11 @@ describe("Relayer", () => {
projectId: TEST_CORE_OPTIONS.projectId,
});
await relayer.init();
relayer.subscriber.subscriptions.set(randomTopic, {
topic: randomTopic,
id: randomTopic,
relay: { protocol: "irn" },
});
await relayer.transportOpen();
const wsConnection = relayer.provider.connection as unknown as WebSocket;
expect(relayer.connected).to.be.true;
Expand Down Expand Up @@ -370,7 +384,7 @@ describe("Relayer", () => {
});

await relayer.init();
await relayer.transportOpen();
await relayer.subscribe(randomTopic);

// @ts-expect-error - accessing private property for testing
const wsUrl = relayer.provider.connection.url;
Expand All @@ -391,7 +405,7 @@ describe("Relayer", () => {
});

await relayer.init();
await relayer.transportOpen();
await relayer.subscribe(randomTopic);

// @ts-expect-error - accessing private property for testing
const wsUrl = relayer.provider.connection.url;
Expand All @@ -412,12 +426,21 @@ describe("Relayer", () => {
});

await relayer.init();
await relayer.transportOpen();
relayer.provider.on(RELAYER_PROVIDER_EVENTS.payload, (payload) => {
expect(payload.error.message).to.include("Unauthorized: origin not allowed");

relayer.subscriber.subscriptions.set(randomTopic, {
topic: randomTopic,
id: randomTopic,
relay: { protocol: "irn" },
});

let errorReceived = false;
relayer.on(RELAYER_EVENTS.error, (payload) => {
expect(payload.message).to.include("Unauthorized: origin not allowed");
errorReceived = true;
});
await relayer.transportOpen().catch((e) => {});
await throttle(1000);
expect(errorReceived).to.be.true;
});

it("[iOS] bundleId included in Cloud Settings - should connect", async () => {
Expand All @@ -433,7 +456,7 @@ describe("Relayer", () => {
});

await relayer.init();
await relayer.transportOpen();
await relayer.subscribe(randomTopic);

// @ts-expect-error - accessing private property for testing
const wsUrl = relayer.provider.connection.url;
Expand All @@ -453,6 +476,11 @@ describe("Relayer", () => {
});

await relayer.init();
relayer.subscriber.subscriptions.set(randomTopic, {
topic: randomTopic,
id: randomTopic,
relay: { protocol: "irn" },
});
await relayer.transportOpen();

// @ts-expect-error - accessing private property for testing
Expand All @@ -474,12 +502,22 @@ describe("Relayer", () => {
});

await relayer.init();
await relayer.transportOpen();
relayer.provider.on(RELAYER_PROVIDER_EVENTS.payload, (payload) => {
expect(payload.error.message).to.include("Unauthorized: origin not allowed");
relayer.subscriber.subscriptions.set(randomTopic, {
topic: randomTopic,
id: randomTopic,
relay: { protocol: "irn" },
});

let errorReceived = false;
relayer.on(RELAYER_EVENTS.error, (payload) => {
expect(payload.message).to.include("Unauthorized: origin not allowed");
errorReceived = true;
});

await relayer.transportOpen().catch((e) => {});

await throttle(1000);
expect(errorReceived).to.be.true;
});

it("[Web] packageName and bundleId not set - should connect", async () => {
Expand All @@ -495,6 +533,11 @@ describe("Relayer", () => {
});

await relayer.init();
relayer.subscriber.subscriptions.set(randomTopic, {
topic: randomTopic,
id: randomTopic,
relay: { protocol: "irn" },
});
await relayer.transportOpen();

// @ts-expect-error - accessing private property for testing
Expand Down
Loading
Loading