Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: offline state recovery for Filter subscription #2049

Merged
merged 26 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 71 additions & 27 deletions packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,36 +34,19 @@
private options: ConnectionManagerOptions;
private libp2p: Libp2p;
private dialAttemptsForPeer: Map<string, number> = new Map();
private dialErrorsForPeer: Map<string, any> = new Map();

Check warning on line 37 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 37 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type

private currentActiveParallelDialCount = 0;
private pendingPeerDialQueue: Array<PeerId> = [];
private online: boolean = false;

public isConnected(): boolean {
return this.online;
}
private isP2PNetworkConnected: boolean = false;

private toggleOnline(): void {
if (!this.online) {
this.online = true;
this.dispatchEvent(
new CustomEvent<boolean>(EConnectionStateEvents.CONNECTION_STATUS, {
detail: this.online
})
);
public isConnected(): boolean {
if (globalThis?.navigator && !globalThis?.navigator?.onLine) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is tricky. I made it that was as to comply with behavior of

const storedPeersData = localStorage.getItem("waku:peers");

when if we run localStorage discovery in nodejs it won't fail tho it won't work

the thing is even though we don't support nodejs - there are some people running js-waku in such env

return false;
}
}

private toggleOffline(): void {
if (this.online && this.libp2p.getConnections().length == 0) {
this.online = false;
this.dispatchEvent(
new CustomEvent<boolean>(EConnectionStateEvents.CONNECTION_STATUS, {
detail: this.online
})
);
}
return this.isP2PNetworkConnected;
}

public static create(
Expand Down Expand Up @@ -103,6 +86,7 @@
"peer:discovery",
this.onEventHandlers["peer:discovery"]
);
this.stopNetworkStatusListener();
}

public async dropConnection(peerId: PeerId): Promise<void> {
Expand Down Expand Up @@ -193,7 +177,7 @@
options: keepAliveOptions
});

this.run()
this.startEventListeners()
.then(() => log.info(`Connection Manager is now running`))
.catch((error) =>
log.error(`Unexpected error while running service`, error)
Expand Down Expand Up @@ -225,11 +209,12 @@
}
}

private async run(): Promise<void> {
// start event listeners
private async startEventListeners(): Promise<void> {
this.startPeerDiscoveryListener();
this.startPeerConnectionListener();
this.startPeerDisconnectionListener();

this.startNetworkStatusListener();
}

private async dialPeer(peerId: PeerId): Promise<void> {
Expand Down Expand Up @@ -266,7 +251,7 @@
// Handle generic error
log.error(
`Error dialing peer ${peerId.toString()} - ${
(error as any).message

Check warning on line 254 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 254 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type
}`
);
}
Expand Down Expand Up @@ -428,14 +413,18 @@
)
);
}
this.toggleOnline();

this.setP2PNetworkConnected();
})();
},
"peer:disconnect": (evt: CustomEvent<PeerId>): void => {
void (async () => {
this.keepAliveManager.stop(evt.detail);
this.toggleOffline();
this.setP2PNetworkDisconnected();
})();
},
"browser:network": (): void => {
this.dispatchWakuConnectionEvent();
}
};

Expand Down Expand Up @@ -572,4 +561,59 @@
if (!shardInfoBytes) return undefined;
return decodeRelayShard(shardInfoBytes);
}

private startNetworkStatusListener(): void {
try {
globalThis.addEventListener(
"online",
this.onEventHandlers["browser:network"]
);
globalThis.addEventListener(
"offline",
this.onEventHandlers["browser:network"]
);
} catch (err) {
log.error(`Failed to start network listener: ${err}`);
}
}

private stopNetworkStatusListener(): void {
try {
globalThis.removeEventListener(
"online",
this.onEventHandlers["browser:network"]
);
globalThis.removeEventListener(
"offline",
this.onEventHandlers["browser:network"]
);
} catch (err) {
log.error(`Failed to stop network listener: ${err}`);
}
}

private setP2PNetworkConnected(): void {
if (!this.isP2PNetworkConnected) {
this.isP2PNetworkConnected = true;
this.dispatchWakuConnectionEvent();
}
}

private setP2PNetworkDisconnected(): void {
if (
this.isP2PNetworkConnected &&
this.libp2p.getConnections().length === 0
) {
this.isP2PNetworkConnected = false;
this.dispatchWakuConnectionEvent();
}
}

private dispatchWakuConnectionEvent(): void {
this.dispatchEvent(
new CustomEvent<boolean>(EConnectionStateEvents.CONNECTION_STATUS, {
detail: this.isConnected()
})
);
}
}
92 changes: 70 additions & 22 deletions packages/sdk/src/protocols/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
type ContentTopic,
type CoreProtocolResult,
type CreateSubscriptionResult,
EConnectionStateEvents,
type IAsyncIterator,
type IDecodedMessage,
type IDecoder,
Expand Down Expand Up @@ -65,20 +66,22 @@ export class SubscriptionManager implements ISubscriptionSDK {
private missedMessagesByPeer: Map<string, number> = new Map();
private maxPingFailures: number = DEFAULT_MAX_PINGS;
private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
private subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS;

private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
>;
> = new Map();

public constructor(
private readonly pubsubTopic: PubsubTopic,
private protocol: FilterCore,
private getPeers: () => Peer[],
private readonly protocol: FilterCore,
private readonly connectionManager: ConnectionManager,
private readonly getPeers: () => Peer[],
private readonly renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>
) {
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();

const allPeerIdStr = this.getPeers().map((p) => p.id.toString());
this.receivedMessagesHashes = {
all: new Set(),
Expand All @@ -89,10 +92,6 @@ export class SubscriptionManager implements ISubscriptionSDK {
allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0));
}

public get messageHashes(): string[] {
return [...this.receivedMessagesHashes.all];
}

weboko marked this conversation as resolved.
Show resolved Hide resolved
private addHash(hash: string, peerIdStr?: string): void {
this.receivedMessagesHashes.all.add(hash);

Expand Down Expand Up @@ -155,9 +154,8 @@ export class SubscriptionManager implements ISubscriptionSDK {
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
});

if (options.keepAlive) {
this.startKeepAlivePings(options);
}
this.subscribeOptions = options;
this.startSubscriptionsMaintenance(options);

return finalResult;
}
Expand All @@ -183,9 +181,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
const finalResult = this.handleResult(results, "unsubscribe");

if (this.subscriptionCallbacks.size === 0) {
if (this.keepAliveTimer) {
this.stopKeepAlivePings();
}
this.stopSubscriptionsMaintenance();
}

return finalResult;
Expand All @@ -211,9 +207,7 @@ export class SubscriptionManager implements ISubscriptionSDK {

const finalResult = this.handleResult(results, "unsubscribeAll");

if (this.keepAliveTimer) {
this.stopKeepAlivePings();
}
this.stopSubscriptionsMaintenance();

return finalResult;
}
Expand Down Expand Up @@ -378,8 +372,19 @@ export class SubscriptionManager implements ISubscriptionSDK {
}
}

private startKeepAlivePings(options: SubscribeOptions): void {
const { keepAlive } = options;
private startSubscriptionsMaintenance(options: SubscribeOptions): void {
if (options?.keepAlive) {
this.startKeepAlivePings(options.keepAlive);
}
this.startConnectionListener();
}

private stopSubscriptionsMaintenance(): void {
this.stopKeepAlivePings();
this.stopConnectionListener();
}

private startKeepAlivePings(interval: number): void {
if (this.keepAliveTimer) {
log.info("Recurring pings already set up.");
return;
Expand All @@ -389,7 +394,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
void this.ping().catch((error) => {
log.error("Error in keep-alive ping cycle:", error);
});
}, keepAlive) as unknown as number;
}, interval) as unknown as number;
}

private stopKeepAlivePings(): void {
Expand All @@ -403,6 +408,48 @@ export class SubscriptionManager implements ISubscriptionSDK {
this.keepAliveTimer = null;
}

private startConnectionListener(): void {
this.connectionManager.addEventListener(
EConnectionStateEvents.CONNECTION_STATUS,
this.connectionListener.bind(this) as (v: CustomEvent<boolean>) => void
);
}

private stopConnectionListener(): void {
this.connectionManager.removeEventListener(
EConnectionStateEvents.CONNECTION_STATUS,
this.connectionListener.bind(this) as (v: CustomEvent<boolean>) => void
);
}

private async connectionListener({
detail: isConnected
}: CustomEvent<boolean>): Promise<void> {
if (!isConnected) {
this.stopKeepAlivePings();
return;
}

try {
const result = await this.ping();
const renewPeerPromises = result.failures.map(
async (v): Promise<void> => {
if (v.peerId) {
await this.renewAndSubscribePeer(v.peerId);
}
}
);

await Promise.all(renewPeerPromises);
} catch (err) {
log.error(`networkStateListener failed to recover: ${err}`);
}

this.startKeepAlivePings(
this.subscribeOptions?.keepAlive || DEFAULT_SUBSCRIBE_OPTIONS.keepAlive
);
}

private incrementMissedMessageCount(peerIdStr: string): void {
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
Expand All @@ -416,6 +463,7 @@ export class SubscriptionManager implements ISubscriptionSDK {

class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
public readonly protocol: FilterCore;
private readonly _connectionManager: ConnectionManager;

private activeSubscriptions = new Map<string, SubscriptionManager>();

Expand Down Expand Up @@ -445,8 +493,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
);

this.protocol = this.core as FilterCore;

this.activeSubscriptions = new Map();
weboko marked this conversation as resolved.
Show resolved Hide resolved
this._connectionManager = connectionManager;
}

/**
Expand Down Expand Up @@ -576,6 +623,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
new SubscriptionManager(
pubsubTopic,
this.protocol,
this._connectionManager,
() => this.connectedPeers,
this.renewPeer.bind(this)
)
Expand Down
Loading
Loading