Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Blocks propagation #364

Merged
merged 30 commits into from
Feb 8, 2016
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
cc3f712
propagade initial
NikVolf Feb 5, 2016
90e4722
Merge branch 'master' into block-propagation
NikVolf Feb 5, 2016
4af85b4
Fixed an issue with forked counters
arkpar Feb 5, 2016
b01f954
final tests
NikVolf Feb 5, 2016
b606df4
many fixes
NikVolf Feb 6, 2016
49e61b8
calculating peer highest number on fly
NikVolf Feb 6, 2016
0905372
updating peer best hash when sync
NikVolf Feb 6, 2016
6b02b6e
using rlp::encode
NikVolf Feb 6, 2016
62f3b8c
Merge branch 'master' into block-propagation
NikVolf Feb 6, 2016
9727f27
blocks + hashes
NikVolf Feb 6, 2016
391ef7e
actually should be this way
NikVolf Feb 6, 2016
8cd5527
... and test as well
NikVolf Feb 6, 2016
74c97ea
removed unused latest_number
NikVolf Feb 6, 2016
3e84691
adding expect
NikVolf Feb 6, 2016
67c5e37
review fixes
NikVolf Feb 6, 2016
0e0f1fe
tests
NikVolf Feb 6, 2016
d40d4ef
fix tests
NikVolf Feb 6, 2016
efef36b
handling sync message
NikVolf Feb 7, 2016
37cc695
Merge branch 'state' into nvolf
NikVolf Feb 7, 2016
c3f2383
Merge branch 'ark' into nvolf
NikVolf Feb 7, 2016
3f17acc
empty new block test
NikVolf Feb 7, 2016
4b1d67e
bunch of tests for new block packet
NikVolf Feb 7, 2016
e9af2df
new hashes tests
NikVolf Feb 7, 2016
69a4349
documentation
NikVolf Feb 7, 2016
b6f74bd
Merge branch 'master' into nvolf
NikVolf Feb 7, 2016
871b711
fixes for valid rlp
NikVolf Feb 7, 2016
70d59e4
Merge branch 'nvolf' into block-propagation
NikVolf Feb 7, 2016
deffb27
refactoring of report functions, some comments
NikVolf Feb 8, 2016
3dd220b
refactoring of report functions, some comments
NikVolf Feb 8, 2016
11103b0
fixed test
NikVolf Feb 8, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion ethcore/src/block_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub struct BlockQueueInfo {
pub verified_queue_size: usize,
/// Number of blocks being verified
pub verifying_queue_size: usize,
/// Indicates queue is empty
pub empty: bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this really a necessary optimisation? (why not fn empty(&self) -> bool { verification.verified.is_empty() && verification.unverified.is_empty() && verification.verifying.is_empty() }?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function for BlockQueueInfo?
seems reasonable
I guess 'full' should be function as well then, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • resolved

}

impl BlockQueueInfo {
Expand Down Expand Up @@ -285,7 +287,6 @@ impl BlockQueue {
for h in hashes {
processing.remove(&h);
}
//TODO: reward peers
}

/// Removes up to `max` verified blocks from the queue
Expand All @@ -312,6 +313,7 @@ impl BlockQueue {
verified_queue_size: verification.verified.len(),
unverified_queue_size: verification.unverified.len(),
verifying_queue_size: verification.verifying.len(),
empty: verification.verified.is_empty() && verification.unverified.is_empty() && verification.verifying.is_empty(),
}
}
}
Expand Down Expand Up @@ -393,4 +395,14 @@ mod tests {
panic!("error importing block that has already been drained ({:?})", e);
}
}

#[test]
fn returns_empty_once_finished() {
let mut queue = get_test_queue();
queue.import_block(get_good_dummy_block()).expect("error importing block that is valid by definition");
queue.flush();
queue.drain(1);

assert!(queue.queue_info().empty);
}
}
8 changes: 6 additions & 2 deletions ethcore/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use spec::Spec;
use engine::Engine;
use views::HeaderView;
use block_queue::{BlockQueue, BlockQueueInfo};
use service::NetSyncMessage;
use service::{NetSyncMessage, SyncMessage};
use env_info::LastHashes;
use verification::*;
use block::*;
Expand Down Expand Up @@ -223,7 +223,7 @@ impl Client {
}

/// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self, _io: &IoChannel<NetSyncMessage>) -> usize {
pub fn import_verified_blocks(&self, io: &IoChannel<NetSyncMessage>) -> usize {
let mut ret = 0;
let mut bad = HashSet::new();
let _import_lock = self.import_lock.lock();
Expand Down Expand Up @@ -295,6 +295,10 @@ impl Client {
self.report.write().unwrap().accrue_block(&block);
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
ret += 1;

if self.block_queue.read().unwrap().queue_info().empty {
io.send(NetworkIoMessage::User(SyncMessage::BlockVerified)).unwrap();
}
}
self.block_queue.write().unwrap().mark_as_good(&good_blocks);
ret
Expand Down
2 changes: 1 addition & 1 deletion sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ clippy = "0.0.37"
log = "0.3"
env_logger = "0.3"
time = "0.1.34"

rand = "0.3.13"
194 changes: 187 additions & 7 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ const MAX_NODE_DATA_TO_SEND: usize = 1024;
const MAX_RECEIPTS_TO_SEND: usize = 1024;
const MAX_HEADERS_TO_REQUEST: usize = 512;
const MAX_BODIES_TO_REQUEST: usize = 256;
const MIN_PEERS_PROPAGATION: usize = 4;
const MAX_PEERS_PROPAGATION: usize = 128;
const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20;

const STATUS_PACKET: u8 = 0x00;
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
Expand Down Expand Up @@ -134,14 +137,15 @@ pub struct SyncStatus {
pub num_active_peers: usize,
}

#[derive(PartialEq, Eq, Debug)]
#[derive(PartialEq, Eq, Debug, Clone)]
/// Peer data type requested
enum PeerAsking {
Nothing,
BlockHeaders,
BlockBodies,
}

#[derive(Clone)]
/// Syncing peer information
struct PeerInfo {
/// eth protocol version
Expand Down Expand Up @@ -636,6 +640,7 @@ impl ChainSync {
if start == 0 {
self.have_common_block = true; //reached genesis
self.last_imported_hash = Some(chain_info.genesis_hash);
self.last_imported_block = Some(0);
}
}
if self.have_common_block {
Expand Down Expand Up @@ -1030,10 +1035,6 @@ impl ChainSync {
})
}

/// Maintain other peers. Send out any new blocks and transactions
pub fn _maintain_sync(&mut self, _io: &mut SyncIo) {
}

pub fn maintain_peers(&self, io: &mut SyncIo) {
let tick = time::precise_time_s();
for (peer_id, peer) in &self.peers {
Expand All @@ -1042,20 +1043,117 @@ impl ChainSync {
}
}
}
/// Maintain other peers. Send out any new blocks and transactions
pub fn maintain_sync(&mut self, io: &mut SyncIo) {

fn check_resume(&mut self, io: &mut SyncIo) {
if !io.chain().queue_info().full && self.state == SyncState::Waiting {
self.state = SyncState::Idle;
self.continue_sync(io);
}
}

fn create_new_hashes_rlp(chain: &BlockChainClient, from: &H256, to: &H256) -> Option<Bytes> {
match chain.tree_route(from, to) {
Some(route) => {
match route.blocks.len() {
0 => None,
_ => {
Some(rlp::encode(&route.blocks).to_vec())
}
}
},
None => None
}
}

fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes {
chain.block(&chain.chain_info().best_block_hash).expect("Creating latest block when there is none")
}

fn get_lagging_peers(&self, io: &SyncIo) -> Vec<PeerId> {
let chain = io.chain();
let chain_info = chain.chain_info();
let latest_hash = chain_info.best_block_hash;
let latest_number = chain_info.best_block_number;
self.peers.iter().filter(|&(_, peer_info)|
match io.chain().block_status(&peer_info.latest)
{
BlockStatus::InChain => {
let peer_number = HeaderView::new(&io.chain().block_header(&peer_info.latest).unwrap()).number();
peer_info.latest != latest_hash && latest_number > peer_number && latest_number - peer_number < MAX_PEER_LAG_PROPAGATION
},
_ => false
})
.map(|(peer_id, _)| peer_id)
.cloned().collect::<Vec<PeerId>>()
}

fn propagade_blocks(&mut self, io: &mut SyncIo) -> usize {
let updated_peers = {
let lagging_peers = self.get_lagging_peers(io);

// sqrt(x)/x scaled to max u32
let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32;
let lucky_peers = match lagging_peers.len() {
0 ... MIN_PEERS_PROPAGATION => lagging_peers,
_ => lagging_peers.iter().filter(|_| ::rand::random::<u32>() < fraction).cloned().collect::<Vec<PeerId>>()
};

// taking at max of MAX_PEERS_PROPAGATION
lucky_peers.iter().take(min(lucky_peers.len(), MAX_PEERS_PROPAGATION)).cloned().collect::<Vec<PeerId>>()
};

let mut sent = 0;
let local_best = io.chain().chain_info().best_block_hash;
for peer_id in updated_peers {
let rlp = ChainSync::create_latest_block_rlp(io.chain());
self.send_request(io, peer_id, PeerAsking::Nothing, NEW_BLOCK_PACKET, rlp);
self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").latest = local_best.clone();
sent = sent + 1;
}
sent
}

fn propagade_new_hashes(&mut self, io: &mut SyncIo) -> usize {
let updated_peers = self.get_lagging_peers(io);
let mut sent = 0;
let local_best = io.chain().chain_info().best_block_hash;
for peer_id in updated_peers {
sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &self.peers.get(&peer_id).expect("ChainSync: unknown peer").latest, &local_best) {
Some(rlp) => {
{
let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer");
peer.latest = local_best.clone();
}
self.send_request(io, peer_id, PeerAsking::Nothing, NEW_BLOCK_HASHES_PACKET, rlp);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Peer's latest block should be updated to prevent sending a block to the same peer twice

1
},
None => 0
}
}
sent
}

/// Maintain other peers. Send out any new blocks and transactions
pub fn maintain_sync(&mut self, io: &mut SyncIo) {
self.check_resume(io);

let peers = self.propagade_new_hashes(io);
trace!(target: "sync", "Sent new hashes to peers: {:?}", peers);
}

pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing documentation.

let peers = self.propagade_blocks(io);
trace!(target: "sync", "Sent latest block to peers: {:?}", peers);
}
}

#[cfg(test)]
mod tests {
use tests::helpers::*;
use super::*;
use util::*;
use super::{PeerInfo, PeerAsking};
use ethcore::header::{BlockNumber};

#[test]
fn return_receipts_empty() {
Expand Down Expand Up @@ -1124,4 +1222,86 @@ mod tests {
sync.on_packet(&mut io, 1usize, super::GET_NODE_DATA_PACKET, &node_request);
assert_eq!(1, io.queue.len());
}

fn dummy_sync_with_peer(peer_latest_hash: H256) -> ChainSync {
let mut sync = ChainSync::new();
sync.peers.insert(0,
PeerInfo {
protocol_version: 0,
genesis: H256::zero(),
network_id: U256::zero(),
latest: peer_latest_hash,
difficulty: U256::zero(),
asking: PeerAsking::Nothing,
asking_blocks: Vec::<BlockNumber>::new(),
ask_time: 0f64,
});
sync
}

#[test]
fn finds_lagging_peers() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let sync = dummy_sync_with_peer(client.block_hash_delta_minus(10));
let io = TestIo::new(&mut client, &mut queue, None);

let lagging_peers = sync.get_lagging_peers(&io);

assert_eq!(1, lagging_peers.len())
}

#[test]
fn calculates_tree_for_lagging_peer() {
let mut client = TestBlockChainClient::new();
client.add_blocks(15, false);

let start = client.block_hash_delta_minus(4);
let end = client.block_hash_delta_minus(2);

// wrong way end -> start, should be None
let rlp = ChainSync::create_new_hashes_rlp(&client, &end, &start);
assert!(rlp.is_none());

let rlp = ChainSync::create_new_hashes_rlp(&client, &start, &end).unwrap();
// size of three rlp encoded hash
assert_eq!(101, rlp.len());
}

#[test]
fn sends_new_hashes_to_lagging_peer() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let mut io = TestIo::new(&mut client, &mut queue, None);

let peer_count = sync.propagade_new_hashes(&mut io);

// 1 message should be send
assert_eq!(1, io.queue.len());
// 1 peer should be updated
assert_eq!(1, peer_count);
// NEW_BLOCK_HASHES_PACKET
assert_eq!(0x01, io.queue[0].packet_id);
}

#[test]
fn sends_latest_block_to_lagging_peer() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let mut io = TestIo::new(&mut client, &mut queue, None);

let peer_count = sync.propagade_blocks(&mut io);

// 1 message should be send
assert_eq!(1, io.queue.len());
// 1 peer should be updated
assert_eq!(1, peer_count);
// NEW_BLOCK_PACKET
assert_eq!(0x07, io.queue[0].packet_id);
}
}
7 changes: 7 additions & 0 deletions sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ extern crate ethcore_util as util;
extern crate ethcore;
extern crate env_logger;
extern crate time;
extern crate rand;

use std::ops::*;
use std::sync::*;
Expand Down Expand Up @@ -125,4 +126,10 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
self.sync.write().unwrap().maintain_peers(&mut NetSyncIo::new(io, self.chain.deref()));
self.sync.write().unwrap().maintain_sync(&mut NetSyncIo::new(io, self.chain.deref()));
}

fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) {
if let SyncMessage::BlockVerified = *message {
self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref()));
}
}
}
44 changes: 44 additions & 0 deletions sync/src/tests/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,48 @@ fn restart() {
fn status_empty() {
let net = TestNet::new(2);
assert_eq!(net.peer(0).sync.status().state, SyncState::NotSynced);
}

#[test]
fn status_packet() {
let mut net = TestNet::new(2);
net.peer_mut(0).chain.add_blocks(1000, false);
net.peer_mut(1).chain.add_blocks(1, false);

net.start();

net.sync_step_peer(0);

assert_eq!(1, net.peer(0).queue.len());
assert_eq!(0x00, net.peer(0).queue[0].packet_id);
}

#[test]
fn propagade_hashes() {
let mut net = TestNet::new(3);
net.peer_mut(1).chain.add_blocks(1000, false);
net.peer_mut(2).chain.add_blocks(1000, false);
net.sync();

net.peer_mut(0).chain.add_blocks(10, false);
net.sync_step_peer(0);

// 2 peers to sync
assert_eq!(2, net.peer(0).queue.len());
// NEW_BLOCK_HASHES_PACKET
assert_eq!(0x01, net.peer(0).queue[0].packet_id);
}

#[test]
fn propagade_blocks() {
let mut net = TestNet::new(10);
net.peer_mut(1).chain.add_blocks(10, false);
net.sync();

net.peer_mut(0).chain.add_blocks(10, false);
net.trigger_block_verified(0);

assert!(!net.peer(0).queue.is_empty());
// NEW_BLOCK_PACKET
assert_eq!(0x07, net.peer(0).queue[0].packet_id);
}
Loading