Skip to content

Commit

Permalink
Make providers WS and IPC reliable (#5763)
Browse files Browse the repository at this point in the history
* ws and ipc reconnection

* fix

* increase time for IPC

* fix ipc

* add error messages. add ganache to dev dep

* add ganache install to actions

* ws reconnection test with geth

* run as a docker

* debug.trace

* test

* skip tests

* test

* test

* test

* test

* ipc fixes

* fix ws tests

* fix

* stop 8545 port

* revert

* small refactor

* fix unit test

* try to stop docker

* wait port timeout

* kill 8547 port

* try stable version of geth

* fix

* fix path

* move helpers

Co-authored-by: Junaid <86780488+jdevcs@users.noreply.github.com>
  • Loading branch information
avkos and jdevcs authored Jan 25, 2023
1 parent bd4fcda commit 8e44e88
Show file tree
Hide file tree
Showing 16 changed files with 585 additions and 207 deletions.
2 changes: 1 addition & 1 deletion packages/web3-providers-ipc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"test:ci": "jest --coverage=true --coverage-reporters=json --verbose",
"test:watch": "npm test -- --watch",
"test:unit": "jest --config=./test/unit/jest.config.js",
"test:integration": "jest --config=./test/integration/jest.config.js --passWithNoTests"
"test:integration": "jest --config=./test/integration/jest.config.js"
},
"devDependencies": {
"@types/jest": "^28.1.6",
Expand Down
74 changes: 14 additions & 60 deletions packages/web3-providers-ipc/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ along with web3.js. If not, see <http://www.gnu.org/licenses/>.
*/

import { Socket } from 'net';
import { InvalidConnectionError, ConnectionNotOpenError, InvalidClientError } from 'web3-errors';
import { ConnectionNotOpenError, InvalidClientError } from 'web3-errors';
import { SocketProvider } from 'web3-utils';
import {
EthExecutionAPI,
JsonRpcId,
SocketRequestItem,
Web3APIMethod,
Web3APIPayload,
Web3APISpec,
Expand All @@ -38,34 +36,24 @@ export default class IpcProvider<API extends Web3APISpec = EthExecutionAPI> exte
Error,
API
> {
private _connectionStatus: Web3ProviderStatus;
// Message handlers. Due to bounding of `this` and removing the listeners we have to keep it's reference.
protected _socketConnection?: Socket;
public constructor(socketPath: string) {
super(socketPath);
this._connectionStatus = 'connecting';
}

public getStatus(): Web3ProviderStatus {
if (this._socketConnection?.connecting) {
return 'connecting';
}
return this._connectionStatus;
}
public connect(): void {
protected _openSocketConnection() {
if (!existsSync(this._socketPath)) {
throw new InvalidClientError(this._socketPath);
}
if (!this._socketConnection || this.getStatus() === 'disconnected') {
this._socketConnection = new Socket();
}
try {
this._connectionStatus = 'connecting';
this._addSocketListeners();
this._socketConnection.connect({ path: this._socketPath });
} catch (e) {
throw new InvalidConnectionError(this._socketPath);
}

this._socketConnection.connect({ path: this._socketPath });
}

protected _closeSocketConnection(code?: number, data?: string) {
Expand All @@ -83,31 +71,12 @@ export default class IpcProvider<API extends Web3APISpec = EthExecutionAPI> exte
this._socketConnection?.write(JSON.stringify(payload));
}

protected _onCloseEvent(event: CloseEvent): void {
if (
this._reconnectOptions.autoReconnect &&
(![1000, 1001].includes(event.code) || !event.wasClean)
) {
this._reconnect();
return;
}

this._clearQueues(event);
this._removeSocketListeners();
this._onDisconnect(event.code, event.reason);
}

protected _parseResponses(e: Buffer | string) {
return this.chunkResponseParser.parseResponse(
typeof e === 'string' ? e : e.toString('utf8'),
);
}

protected _onClose(event: CloseEvent): void {
this._clearQueues(event);
this._removeSocketListeners();
}

protected _addSocketListeners(): void {
this._socketConnection?.on('data', this._onMessageHandler);
this._socketConnection?.on('connect', this._onOpenHandler);
Expand Down Expand Up @@ -136,35 +105,20 @@ export default class IpcProvider<API extends Web3APISpec = EthExecutionAPI> exte
this._socketConnection?.removeAllListeners('data');
}

protected _clearQueues(event?: CloseEvent) {
if (this._pendingRequestsQueue.size > 0) {
this._pendingRequestsQueue.forEach(
(request: SocketRequestItem<any, any, any>, key: JsonRpcId) => {
request.deferredPromise.reject(new ConnectionNotOpenError(event));
this._pendingRequestsQueue.delete(key);
},
);
}

if (this._sentRequestsQueue.size > 0) {
this._sentRequestsQueue.forEach(
(request: SocketRequestItem<any, any, any>, key: JsonRpcId) => {
request.deferredPromise.reject(new ConnectionNotOpenError(event));
this._sentRequestsQueue.delete(key);
},
);
protected _onCloseEvent(event: CloseEvent): void {
if (!event && this._reconnectOptions.autoReconnect) {
this._connectionStatus = 'disconnected';
this._reconnect();
return;
}

this._clearQueues(event);
this._removeSocketListeners();
this._onDisconnect(event?.code, event?.reason);
}

protected _onConnect() {
this._connectionStatus = 'connected';
super._onConnect();
}

protected _onDisconnect(code?: number, data?: string): void {
this._connectionStatus = 'disconnected';
super._onDisconnect(code, data);
protected _onClose(event: CloseEvent): void {
this._clearQueues(event);
this._removeSocketListeners();
}
}
57 changes: 57 additions & 0 deletions packages/web3-providers-ipc/test/fixtures/helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
This file is part of web3.js.
web3.js is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
web3.js is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with web3.js. If not, see <http://www.gnu.org/licenses/>.
*/

import { exec } from 'child_process';
import path from 'path';
import fs from 'fs';

const IPC_DIR_PATH = path.join(__dirname, '..', '..', '..', '..', 'tmp');
const IPC_PATH = path.join(IPC_DIR_PATH, 'some.ipc');
const IPC_ORIGIN_PATH = path.join(IPC_DIR_PATH, 'some.ipc');

const createSymlink = `ln -s ${path.join(IPC_DIR_PATH, 'ipc.ipc')} ${IPC_ORIGIN_PATH}`;

const execPromise = async (command: string): Promise<string> =>
new Promise((resolve, reject) => {
exec(command, (error, stdout, stderr) => {
if (error) {
reject(error);
return;
}
if (stderr) {
reject(stderr);
return;
}
resolve(stdout);
});
});

const removeIfExists = () => {
if (fs.existsSync(IPC_PATH)) {
fs.unlinkSync(IPC_PATH);
}
};
export const startGethServer = async (): Promise<{ path: string; close: () => void }> => {
removeIfExists();
await execPromise(createSymlink);
return {
path: IPC_PATH,
close: (): void => {
removeIfExists();
},
};
};
104 changes: 104 additions & 0 deletions packages/web3-providers-ipc/test/integration/reconnection.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
This file is part of web3.js.
web3.js is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
web3.js is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with web3.js. If not, see <http://www.gnu.org/licenses/>.
*/

import IpcProvider from '../../src';

import {
describeIf,
getSystemTestProvider,
isIpc,
waitForOpenSocketConnection,
waitForCloseSocketConnection,
waitForEvent,
} from '../fixtures/system_test_utils';
import { startGethServer } from '../fixtures/helpers';

describeIf(isIpc)('IpcSocketProvider - reconnection', () => {
describe('subscribe event tests', () => {
let reconnectionOptions: {
delay: number;
autoReconnect: boolean;
maxAttempts: number;
};
beforeAll(() => {
reconnectionOptions = {
delay: 50,
autoReconnect: true,
maxAttempts: 1000,
};
});
it('check defaults', async () => {
const web3Provider = new IpcProvider(getSystemTestProvider());
// @ts-expect-error-next-line
expect(web3Provider._reconnectOptions).toEqual({
autoReconnect: true,
delay: 5000,
maxAttempts: 5,
});
await waitForOpenSocketConnection(web3Provider);
web3Provider.disconnect(1000, 'test');
await waitForCloseSocketConnection(web3Provider);
});
it('set custom reconnectOptions', async () => {
const web3Provider = new IpcProvider(getSystemTestProvider(), {}, reconnectionOptions);
// @ts-expect-error-next-line
expect(web3Provider._reconnectOptions).toEqual(reconnectionOptions);
await waitForOpenSocketConnection(web3Provider);
web3Provider.disconnect(1000, 'test');
await waitForCloseSocketConnection(web3Provider);
});
it('should emit connect and disconnected events', async () => {
const web3Provider = new IpcProvider(getSystemTestProvider());
expect(!!(await waitForEvent(web3Provider, 'connect'))).toBe(true);
const disconnectPromise = waitForEvent(web3Provider, 'disconnect');
web3Provider.disconnect();
expect(!!(await disconnectPromise)).toBe(true);
// @ts-expect-error read protected property
expect(web3Provider.isReconnecting).toBe(false);
});
it('should connect, disconnect and reconnect', async () => {
const web3Provider = new IpcProvider(getSystemTestProvider(), {}, reconnectionOptions);
expect(!!(await waitForEvent(web3Provider, 'connect'))).toBe(true);
const connectEvent = waitForEvent(web3Provider, 'connect');
// @ts-expect-error call protected function
web3Provider._socketConnection?.end();
expect(!!(await connectEvent)).toBe(true);
web3Provider.disconnect();
await waitForEvent(web3Provider, 'disconnect');
});
it('should connect, disconnect, try reconnect and reach max attempts', async () => {
const server = await startGethServer();
const web3Provider = new IpcProvider(
server.path,
{},
{
...reconnectionOptions,
delay: 1,
maxAttempts: 3,
},
);
expect(!!(await waitForEvent(web3Provider, 'connect'))).toBe(true);
server.close();
// @ts-expect-error call protected function
web3Provider._socketConnection?.end();
const errorEvent = waitForEvent(web3Provider, 'error');

const errorMessage = await errorEvent;
expect(errorMessage).toBe(`Max connection attempts exceeded (${3})`);
});
});
});
11 changes: 8 additions & 3 deletions packages/web3-providers-ipc/test/unit/ipc_provider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ along with web3.js. If not, see <http://www.gnu.org/licenses/>.
*/

import * as fs from 'fs';
import { InvalidClientError } from 'web3-errors';
import { ConnectionError, InvalidClientError } from 'web3-errors';
import IpcProvider from '../../src/index';

jest.mock('net');
Expand Down Expand Up @@ -56,8 +56,13 @@ describe('IpcProvider', () => {

it('should throw error if socket path does not exists', () => {
jest.spyOn(fs, 'existsSync').mockReturnValue(false);

expect(() => new IpcProvider(socketPath)).toThrow(new InvalidClientError(socketPath));
expect(() => new IpcProvider(socketPath)).toThrow(
new ConnectionError(
`Error while connecting to ${socketPath}. Reason: ${
new InvalidClientError(socketPath).message
}`,
),
);
});

it('should add listeners to socket', () => {
Expand Down
Loading

0 comments on commit 8e44e88

Please sign in to comment.