-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathconnectWithPeer.js
167 lines (156 loc) · 6.23 KB
/
connectWithPeer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import 'colors';
import BlockModel from 'models/Block';
import { formatBlock } from 'db/syncBlocksWithStore';
import net from 'net';
import { startMining } from 'mining/startMining';
import store from 'store/store';
import uniq from 'lodash/uniq';
import { wait } from 'utils';
const DEFAULT_PORT = 8334;
const DELIMITER = '~~~~~'
let reg = new RegExp(DELIMITER, 'gi');
export async function isNodeSynced() {
let { allPeers, isMining } = store.getState();
let validPeers = allPeers.filter(peer => (!peer.unreachable && !peer.wrongVersion));
let allPeersSynced = uniq(validPeers.map(({ synced }) => synced));
let isSynced = allPeersSynced.length === 1 && allPeersSynced[0];
if (isSynced && !isMining) {
store.dispatch({ type: 'START_MINING' });
await startMining();
}
return isSynced;
}
// First establish TCP/IP connection with peer
// If cannot establish connection or version is incompatible, set "peer.unreachable" to true
// exchange VERSION headers
// exchange blocks if missing blocks
// once receive blocks, start again with VERSION header until fully synced
export async function connectWithPeer(peer, lastBlockHash, version) {
let IS_VERSION_COMPATIBLE = false;
let HAS_MORE_BLOCKS = false;
const port = DEFAULT_PORT;
// console.log('> Connecting with peer: ', peer, lastBlockHash, version);
try {
const client = new net.Socket();
// PEER CONNECTED
client.connect(port, peer.ip, () => {
console.log('> Connected to peer: ', peer);
let type = 'VERSION';
client.write([ type, version, lastBlockHash ].join(DELIMITER));
store.dispatch({ type: 'CONNECT_PEER', ip: peer.ip, client });
});
client.on('data', async data => {
let [ type, ...args ] = data.toString().split(DELIMITER);
console.log('> Received: '.yellow, data.toString().replace(reg, ' '));
let version, blockHeaderHash, lastBlock, savedLastBlock, savedLastBlockHash;
let blocksToSend, message, allPeers, unfetchedHeaders;
let headers, peerIdx, header, block, savedBlock, newBlock;
let numBlocksToFetch = 0;
switch(type) {
// Initial swapping of version number of last block hash
case 'VERSION':
version = args[0];
blockHeaderHash = args[1];
if (version !== '1') {
break;
}
IS_VERSION_COMPATIBLE = true;
// check db for what block height received block hash is
lastBlock = await BlockModel.findOne({ hash: blockHeaderHash });
if (!lastBlock) {
// send getblocks message
savedLastBlock = store.getState().lastBlock;
savedLastBlockHash = savedLastBlock.getBlockHeaderHash();
client.write([ 'GETBLOCKS', savedLastBlockHash ].join(DELIMITER));
break;
}
store.dispatch({ type: 'SYNC_PEER', ip: peer.ip });
await isNodeSynced();
return true;
break;
// Peer requests block headers for up to 50 blocks
case 'GETBLOCKS':
blockHeaderHash = args[0];
lastBlock = await BlockModel.findOne({ hash: blockHeaderHash });
if (!!lastBlock) {
blocksToSend = await BlockModel.find({ timestamp: { $gte: lastBlock.timestamp } }).limit(50);
message = ['BLOCKHEADERS', ...blocksToSend.map(blk => blk.hash) ].join(DELIMITER);
client.write(message);
}
break;
// Receive block headers from peer
case 'BLOCKHEADERS':
// add to unfetchedHeaders
store.dispatch({ type: 'ADD_UNFETCHED_HEADERS', headers: args });
numBlocksToFetch = args.length;
let { allPeers, unfetchedHeaders } = store.getState();
headers = Array.from(unfetchedHeaders);
peerIdx = 0;
while (headers.length) {
// assign header to peer
let peer = allPeers[peerIdx];
// connect with peer if no connection
if (!peer.client) {
// await connectWithPeer(peer, lastBlockHash, version);
}
header = headers.shift(); // dequeue a header
client.write(`REQUESTBLOCK${DELIMITER}` + header);
await wait(1); // wait 1 second
// if peer doesn't respond within a period or doesn't have the block, move to next peer
// if peer gives block, verify the block (if possible) and add to MongoDB
// move from unfetched => loading
store.dispatch({ type: 'LOADING_BLOCK', header });
peerIdx = allPeers.length % (peerIdx + 1);
}
break;
// Peer requests a specific block - find from DB and send serialized block
case 'REQUESTBLOCK':
// find the requested block and send as a JSON-serialized string
header = args[0];
block = await BlockModel.findOne({ hash: header });
if (block) {
let msg = JSON.stringify(block);
client.write(`SENDBLOCK${DELIMITER}` + JSON.stringify(block));
}
break;
case 'SENDBLOCK':
block = JSON.parse(args[0]);
// check if already have
savedBlock = await BlockModel.findOne({ hash: block.hash });
if (savedBlock) {
break;
}
// if don't have, does the previousHash match our lastBlock.hash?
lastBlock = store.getState().lastBlock;
if (!lastBlock) {
break;
}
if (block.previousHash === lastBlock.getBlockHeaderHash()) {
// add block to blockchain
newBlock = new BlockModel(block);
await newBlock.save();
// remove from orphan and unfetched / loading pools
store.dispatch({ type: 'NEW_BLOCK', block: formatBlock(newBlock) });
numBlocksToFetch -= 1;
if (numBlocksToFetch === 0) {
// RESTART
}
} else {
// if not, add to orphan transactions
}
}
});
client.on('close', () => {
console.log('> Connection closed');
});
client.on('timeout', () => {
console.log('> Connection timed out ');
});
client.on('error', (err) => {
console.error(err);
});
} catch (e) {
// console.warn(e);
console.log('> Peer could not connect');
}
}