Skip to content

Commit

Permalink
connection retries
Browse files Browse the repository at this point in the history
  • Loading branch information
Moshe Shababo authored and sangaman committed Sep 8, 2018
1 parent 15ea3f8 commit 1773836
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 63 deletions.
4 changes: 2 additions & 2 deletions lib/grpc/GrpcService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ class GrpcService {
*/
public disconnect: grpc.handleUnaryCall<xudrpc.DisconnectRequest, xudrpc.DisconnectResponse> = async (call, callback) => {
try {
const disconnectResponse = await this.service.disconnect(call.request.toObject());
await this.service.disconnect(call.request.toObject());
const response = new xudrpc.DisconnectResponse();
response.setResult(disconnectResponse);
response.setResult('success');
callback(null, response);
} catch (err) {
callback(this.getGrpcError(err), null);
Expand Down
28 changes: 26 additions & 2 deletions lib/p2p/NodeList.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import P2PRepository from './P2PRepository';
import { NodeInstance, NodeFactory } from '../types/db';
import { NodeConnectionInfo, Address } from '../types/p2p';
import { Address } from '../types/p2p';
import addressUtils from '../utils/addressUtils';

/** Represents a list of nodes for managing network peers activity */
class NodeList {
Expand Down Expand Up @@ -69,13 +70,36 @@ class NodeList {
public updateAddresses = async (nodePubKey: string, addresses: Address[] = []) => {
const node = this.nodes.get(nodePubKey);
if (node) {
node.addresses = addresses;
// avoid overriding the `lastConnected` field for existing matching addresses unless a new value was set
node.addresses = addresses.map((newAddress) => {
const oldAddress = node.addresses.find(address => addressUtils.areEqual(address, newAddress));
if (oldAddress && !newAddress.lastConnected) {
return oldAddress;
} else {
return newAddress;
}
});

await this.repository.updateNode(node);
return true;
}

return false;
}

public removeAddress = async (nodePubKey: string, address: Address) => {
const node = this.nodes.get(nodePubKey);
if (node) {
const index = node.addresses.findIndex(existingAddress => addressUtils.areEqual(address, existingAddress));
if (index > -1) {
node.addresses = [...node.addresses.slice(0, index), ...node.addresses.slice(index + 1)];
await this.repository.updateNode(node);
return true;
}
}

return false;
}
}

export default NodeList;
71 changes: 52 additions & 19 deletions lib/p2p/Peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ interface Peer {

/** Represents a remote XU peer */
class Peer extends EventEmitter {
// TODO: properties documentation
public socketAddress!: Address;
public inbound!: boolean;
public connected = false;
Expand All @@ -62,10 +61,16 @@ class Peer extends EventEmitter {
private static STALL_INTERVAL = 5000;
/** Interval for pinging peers. */
private static PING_INTERVAL = 30000;
/** Socket connection timeout for outbound peers. */
private static CONNECTION_TIMEOUT = 10000;
/** Response timeout for response packets. */
private static RESPONSE_TIMEOUT = 10000;
/** Socket connection timeout for outbound peers. */
private static CONNECTION_TIMEOUT = 10000;
/** Connection retries min delay. */
private static CONNECTION_RETRIES_MIN_DELAY = 5000;
/** Connection retries max delay. */
private static CONNECTION_RETRIES_MAX_DELAY = 3600000;
/** Connection retries max period. */
private static CONNECTION_RETRIES_MAX_PERIOD = 604800000;

public get nodePubKey(): string | undefined {
return this.handshakeState ? this.handshakeState.nodePubKey : undefined;
Expand Down Expand Up @@ -122,14 +127,17 @@ class Peer extends EventEmitter {
* and emit the `open` event if everything succeeds. Throw an error on unexpected handshake data.
* @param handshakeData our handshake data to send to the peer
* @param nodePubKey the expected nodePubKey of the node we are opening a connection with
* @param retryConnecting whether to retry to connect upon failure
*/
public open = async (handshakeData: HandshakeState, nodePubKey?: string): Promise<void> => {
public open = async (handshakeData: HandshakeState, nodePubKey?: string, retryConnecting?: boolean): Promise<void> => {
assert(!this.opened);
assert(!this.closed);
assert(this.inbound || nodePubKey);
assert(!retryConnecting || !this.inbound);

this.opened = true;

await this.initConnection();
await this.initConnection(retryConnecting);
this.initStall();
await this.initHello(handshakeData);

Expand Down Expand Up @@ -234,7 +242,7 @@ class Peer extends EventEmitter {
* Ensure we are connected (for inbound connections) or listen for the `connect` socket event (for outbound connections)
* and set the [[connectTime]] timestamp. If an outbound connection attempt errors or times out, throw an error.
*/
private initConnection = async () => {
private initConnection = async (retry = false) => {
assert(this.socket);

if (this.connected) {
Expand All @@ -244,7 +252,12 @@ class Peer extends EventEmitter {
return;
}

return new Promise<void>((resolve, reject) => {
return new Promise((resolve, reject) => {
const address: Address = { port: this.socketAddress.port, host: this.socketAddress.host };
const startTime = Date.now();
let retryDelay = Peer.CONNECTION_RETRIES_MIN_DELAY;
let retries = 0;

const cleanup = () => {
if (this.connectTimeout) {
clearTimeout(this.connectTimeout);
Expand All @@ -256,11 +269,6 @@ class Peer extends EventEmitter {
}
};

const onError = (err: Error) => {
cleanup();
reject(err);
};

const onConnect = () => {
this.connectTime = Date.now();
this.connected = true;
Expand All @@ -271,16 +279,41 @@ class Peer extends EventEmitter {
resolve();
};

const onTimeout = () => {
const onError = (err: Error) => {
cleanup();
reject(new Error('Connection timed out.'));
};

this.socket!.once('connect', onConnect);
if (!retry) {
this.close();
reject(err);
return;
}

this.socket!.once('error', onError);
if (Date.now() - startTime + retryDelay > Peer.CONNECTION_RETRIES_MAX_PERIOD) {
this.close();
reject(errors.CONNECTING_RETRIES_MAX_PERIOD_EXCEEDED);
return;
}

this.connectTimeout = setTimeout(onTimeout, Peer.CONNECTION_TIMEOUT);
this.logger.debug(
`Connection attempt #${retries + 1} to peer (${addressUtils.toString(this.socketAddress)}) ` +
`failed: ${err.message}. retrying in ${retryDelay / 1000} sec...`,
);

setTimeout(() => {
retryDelay = Math.min(Peer.CONNECTION_RETRIES_MAX_DELAY, retryDelay * 2);
retries = retries + 1;
this.socket!.connect(address);
bind();
}, retryDelay);
};

const bind = () => {
this.socket!.once('connect', onConnect);
this.socket!.once('error', onError);
this.connectTimeout = setTimeout(() => onError(new Error('Connection timed out')), Peer.CONNECTION_TIMEOUT);
};

bind();
});
}

Expand Down Expand Up @@ -434,8 +467,8 @@ class Peer extends EventEmitter {
} else {
this.logger.info(`Peer ${this.nodePubKey} socket closed`);
}
this.close();
}
this.close();
});

this.socket.on('data', (data) => {
Expand Down
78 changes: 56 additions & 22 deletions lib/p2p/Pool.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import net, { Server, Socket } from 'net';
import { EventEmitter } from 'events';
import errors from './errors';
import errors, { errorCodes } from './errors';
import Peer, { PeerInfo } from './Peer';
import NodeList from './NodeList';
import PeerList from './PeerList';
Expand Down Expand Up @@ -115,7 +115,7 @@ class Pool extends EventEmitter {

this.logger.info('Connecting to known / previously connected peers');
await this.nodes.load();
this.connectNodes(this.nodes).then(() => {
this.connectNodes(this.nodes, false, true).then(() => {
this.logger.info('Completed start-up connections to known peers.');
}).catch((reason) => {
this.logger.error('Unexpected error connecting to known peers on startup', reason);
Expand Down Expand Up @@ -163,16 +163,17 @@ class Pool extends EventEmitter {
* If the node is banned, already connected, or has no listening addresses, then do nothing.
* @param nodes a collection of nodes with a `forEach` iterator to attempt to connect to
* @param ignoreKnown whether to ignore nodes we are already aware of, defaults to false
* @param retryConnecting whether to attempt retry connecting, defaults to false
* @returns a promise that will resolve when all outbound connections resolve
*/
private connectNodes = (nodes: NodeConnectionIterator, ignoreKnown = false) => {
private connectNodes = (nodes: NodeConnectionIterator, ignoreKnown = false, retryConnecting = false) => {
const connectionPromises: Promise<void>[] = [];
nodes.forEach((node) => {
// check that this node is not ourselves, that it has listening addresses,
// and that either we haven't heard of it, or we're not ignoring known nodes and it's not banned
if (node.nodePubKey !== this.handshakeData.nodePubKey && node.addresses.length > 0 &&
(!this.nodes.has(node.nodePubKey) || (!ignoreKnown && !this.nodes.isBanned(node.nodePubKey)))) {
connectionPromises.push(this.connectNode(node));
connectionPromises.push(this.tryConnectNode(node, retryConnecting));
}
});
return Promise.all(connectionPromises);
Expand All @@ -181,14 +182,27 @@ class Pool extends EventEmitter {
/**
* Attempt to create an outbound connection to a node using its known listening addresses.
*/
private connectNode = async ({ addresses, nodePubKey }: NodeConnectionInfo) => {
for (let n = 0; n < addresses.length; n += 1) {
private tryConnectNode = async (node: NodeConnectionInfo, retryConnecting = false) => {
const { addresses, nodePubKey } = node;

// sort by lastConnected desc
const sortedAddresses = [...addresses].sort((a, b) => {
if (!a.lastConnected) return 1;
if (!b.lastConnected) return -1;
return b.lastConnected - a.lastConnected;
});

for (const address of sortedAddresses) {
try {
await this.addOutbound(addresses[n], nodePubKey);
break; // once we've successfully established an outbound connection, stop attempting new connections
} catch (err) {
this.logger.info(err);
}
await this.addOutbound(address, nodePubKey, false);
return; // once we've successfully established an outbound connection, stop attempting new connections
} catch (err) {}
}

if (retryConnecting && sortedAddresses.length && sortedAddresses[0].lastConnected) {
try {
await this.addOutbound(sortedAddresses[0], nodePubKey, true);
} catch (err) {}
}
}

Expand All @@ -199,7 +213,7 @@ class Pool extends EventEmitter {
* @param nodePubKey the nodePubKey of the node to connect to
* @returns the connected peer
*/
public addOutbound = async (address: Address, nodePubKey: string): Promise<Peer> => {
public addOutbound = async (address: Address, nodePubKey: string, retryConnecting: boolean): Promise<Peer> => {
if (nodePubKey === this.handshakeData.nodePubKey) {
const err = errors.ATTEMPTED_CONNECTION_TO_SELF;
this.logger.warn(err.message);
Expand All @@ -210,7 +224,7 @@ class Pool extends EventEmitter {
}

const peer = Peer.fromOutbound(address, this.logger);
await this.tryOpenPeer(peer, nodePubKey);
await this.openPeer(peer, nodePubKey, retryConnecting);
return peer;
}

Expand All @@ -224,17 +238,27 @@ class Pool extends EventEmitter {
return peerInfos;
}

private tryOpenPeer = async (peer: Peer, nodePubKey?: string): Promise<void> => {
private tryOpenPeer = async (peer: Peer, nodePubKey?: string, retryConnecting?: boolean): Promise<void> => {
try {
await this.openPeer(peer, nodePubKey);
} catch (err) {
this.logger.warn(`error while opening connection to peer ${nodePubKey}: ${err.message}`);
}
await this.openPeer(peer, nodePubKey, retryConnecting);
} catch (err) {}
}

private openPeer = async (peer: Peer, nodePubKey?: string): Promise<void> => {
private openPeer = async (peer: Peer, nodePubKey?: string, retryConnecting?: boolean): Promise<void> => {
this.bindPeer(peer);
await peer.open(this.handshakeData, nodePubKey);
try {
await peer.open(this.handshakeData, nodePubKey, retryConnecting);
} catch (err) {
// we don't have `nodePubKey` for inbound connections, which might fail on handshake
const id = nodePubKey || addressUtils.toString(peer.socketAddress);
this.logger.warn(`error while opening connection to peer (${id}): ${err.message}`);

if (err.code === errorCodes.CONNECTING_RETRIES_MAX_PERIOD_EXCEEDED) {
await this.nodes.removeAddress(nodePubKey!, peer.socketAddress);
}

throw err;
}
}

public closePeer = async (nodePubKey: string): Promise<void> => {
Expand Down Expand Up @@ -370,14 +394,24 @@ class Pool extends EventEmitter {
peer.sendPacket(new packets.GetOrdersPacket());
peer.sendPacket(new packets.GetNodesPacket());

// if outbound, update the `lastConnected` field for the address we're actually connected to
const addresses = peer.inbound ? peer.addresses! : peer.addresses!.map((address) => {
if (addressUtils.areEqual(peer.socketAddress, address)) {
return { ...address, lastConnected: Date.now() };
} else {
return address;
}
});

// upserting the node entry
if (!this.nodes.has(peer.nodePubKey!)) {
await this.nodes.createNode({
addresses,
nodePubKey: peer.nodePubKey!,
addresses: peer.addresses!,
});
} else {
// the node is known, update its listening addresses
await this.nodes.updateAddresses(peer.nodePubKey!, peer.addresses);
await this.nodes.updateAddresses(peer.nodePubKey!, addresses);
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions lib/p2p/errors.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import errorCodesPrefix from '../constants/errorCodesPrefix';
import { Address } from '../types/p2p';

const codesPrefix = errorCodesPrefix.P2P;
const errorCodes = {
NODE_ALREADY_CONNECTED: codesPrefix.concat('.1'),
NOT_CONNECTED: codesPrefix.concat('.2'),
UNEXPECTED_NODE_PUB_KEY: codesPrefix.concat('.3'),
ATTEMPTED_CONNECTION_TO_SELF: codesPrefix.concat('.4'),
EXTERNAL_IP_UNRETRIEVABLE: codesPrefix.concat('.5'),
CONNECTING_RETRIES_MAX_PERIOD_EXCEEDED: codesPrefix.concat('.6'),
};

const errors = {
Expand All @@ -29,6 +32,10 @@ const errors = {
message: `could not retrieve external IP: ${err.message}`,
code: errorCodes.EXTERNAL_IP_UNRETRIEVABLE,
}),
CONNECTING_RETRIES_MAX_PERIOD_EXCEEDED: {
message: `Connection retry attempts to peer exceeded maximum time allotment`,
code: errorCodes.CONNECTING_RETRIES_MAX_PERIOD_EXCEEDED,
},
};

export { errorCodes };
Expand Down
4 changes: 2 additions & 2 deletions lib/service/Service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ class Service extends EventEmitter {
argChecks.HAS_NODE_PUB_KEY({ nodePubKey });
argChecks.HAS_HOST({ host });
argChecks.VALID_PORT({ port });
const peer = await this.pool.addOutbound({ host, port }, nodePubKey);

const peer = await this.pool.addOutbound({ host, port }, nodePubKey, false);
return peer.getStatus();
}

Expand All @@ -154,7 +155,6 @@ class Service extends EventEmitter {
const { nodePubKey } = args;
argChecks.HAS_NODE_PUB_KEY(args);
await this.pool.closePeer(nodePubKey);
return 'success';
}

/**
Expand Down
Loading

0 comments on commit 1773836

Please sign in to comment.