Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Try to fix out of view statements #5177

Merged
merged 16 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1459,6 +1459,11 @@ async fn handle_incoming_message<'a>(
Err(DeniedStatement::NotUseful) => return None,
Err(DeniedStatement::UsefulButKnown) => {
report_peer(ctx, peer, BENEFIT_VALID_STATEMENT).await;
vstakhov marked this conversation as resolved.
Show resolved Hide resolved
// Note a received statement in the peer data
peer_data
.receive(&relay_parent, &fingerprint, max_message_count)
.expect("checked in `check_can_receive` above; qed");

return None
},
}
Expand Down
318 changes: 318 additions & 0 deletions node/network/statement-distribution/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1842,6 +1842,324 @@ fn peer_cant_flood_with_large_statements() {
executor::block_on(future::join(test_fut, bg));
}

// This test addresses an issue when received knowledge is not updated on a
vstakhov marked this conversation as resolved.
Show resolved Hide resolved
// subsequent `Seconded` statements
//
// To trigger this issue, one should modify `circulate_statement` function
// where there is a selection of lucky/unlucky peers to make all peers unlucky.
// It could be done, for example, by avoiding calling to `peer_knowledge.send`:
// `let peers_to_send: Vec<(PeerId, bool)> = peers_to_send` <--- here
//
// Secondly, the proposed change to update received knowledge of a peer in the
// function `handle_incoming_message` must also be removed to reproduce the issue.
// This is done after `active_head.check_useful_or_unknown` check where a statement
// is useful but known.
#[test]
fn handle_multiple_seconded_statements() {
let hash_a = Hash::repeat_byte(1);
vstakhov marked this conversation as resolved.
Show resolved Hide resolved

let candidate = {
let mut c = dummy_committed_candidate_receipt(hash_a);
c.descriptor.relay_parent = hash_a;
vstakhov marked this conversation as resolved.
Show resolved Hide resolved
c.descriptor.para_id = 1.into();
c
};
let candidate_hash = candidate.hash();

let peer_a = PeerId::random();
let peer_b = PeerId::random();
vstakhov marked this conversation as resolved.
Show resolved Hide resolved

let validators = vec![
Sr25519Keyring::Alice.pair(),
Sr25519Keyring::Bob.pair(),
Sr25519Keyring::Charlie.pair(),
];

let session_info = make_session_info(validators, vec![]);

let session_index = 1;

let pool = sp_core::testing::TaskExecutor::new();
vstakhov marked this conversation as resolved.
Show resolved Hide resolved
let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);

let (statement_req_receiver, _) = IncomingRequest::get_config_receiver();

let bg = async move {
vstakhov marked this conversation as resolved.
Show resolved Hide resolved
let s = StatementDistributionSubsystem::new(
Arc::new(LocalKeystore::in_memory()),
statement_req_receiver,
Default::default(),
);
s.run(ctx).await.unwrap();
};

let test_fut = async move {
// register our active heads.
handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: hash_a,
number: 1,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
)))
.await;

assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx))
)
if r == hash_a
=> {
let _ = tx.send(Ok(session_index));
}
);

assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionInfo(sess_index, tx))
)
if r == hash_a && sess_index == session_index
=> {
let _ = tx.send(Ok(Some(session_info)));
}
);

// notify of peers and view
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None),
),
})
.await;

handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None),
),
})
.await;

handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a]),
),
})
.await;

handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a]),
),
})
.await;

// receive a seconded statement from peer A. it should be propagated onwards to peer B and to
// candidate backing.
let statement = {
let signing_context = SigningContext { parent_hash: hash_a, session_index };

let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = CryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Alice.to_seed()),
)
.await
.unwrap();

SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate.clone()),
&signing_context,
ValidatorIndex(0),
&alice_public.into(),
)
.await
.ok()
.flatten()
.expect("should be signed")
};

// `PeerA` sends a `Seconded` message
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
protocol_v1::StatementDistributionMessage::Statement(
hash_a,
statement.clone().into(),
),
),
),
})
.await;

assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST => {}
vstakhov marked this conversation as resolved.
Show resolved Hide resolved
);

// After the first valid statement, we expect messages to be circulated
assert_matches!(
handle.recv().await,
AllMessages::CandidateBacking(
CandidateBackingMessage::Statement(r, s)
) if r == hash_a && s == statement => {}
);

assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
recipients,
protocol_v1::ValidationProtocol::StatementDistribution(
protocol_v1::StatementDistributionMessage::Statement(r, s)
),
)
) => {
assert_eq!(recipients, vec![peer_b.clone()]);
assert_eq!(r, hash_a);
assert_eq!(s, statement.clone().into());
}
);

// `PeerB` sends a `Seconded` message: valid but known
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
protocol_v1::StatementDistributionMessage::Statement(
hash_a,
statement.clone().into(),
),
),
),
})
drahnr marked this conversation as resolved.
Show resolved Hide resolved
.await;

assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_b && r == BENEFIT_VALID_STATEMENT => {}
);

// Create a `Valid` statement
let statement = {
let signing_context = SigningContext { parent_hash: hash_a, session_index };

let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = CryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Alice.to_seed()),
)
.await
.unwrap();

SignedFullStatement::sign(
&keystore,
Statement::Valid(candidate_hash),
&signing_context,
ValidatorIndex(0),
&alice_public.into(),
)
.await
.ok()
.flatten()
.expect("should be signed")
};

// `PeerA` sends a `Valid` message
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
protocol_v1::StatementDistributionMessage::Statement(
hash_a,
statement.clone().into(),
),
),
),
})
.await;

assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST => {}
);

assert_matches!(
handle.recv().await,
AllMessages::CandidateBacking(
CandidateBackingMessage::Statement(r, s)
) if r == hash_a && s == statement => {}
);

assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
recipients,
protocol_v1::ValidationProtocol::StatementDistribution(
protocol_v1::StatementDistributionMessage::Statement(r, s)
),
)
) => {
assert_eq!(recipients, vec![peer_b.clone()]);
assert_eq!(r, hash_a);
assert_eq!(s, statement.clone().into());
}
);

// `PeerB` sends a `Valid` message
handle
.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
protocol_v1::StatementDistributionMessage::Statement(
hash_a,
statement.clone().into(),
),
),
),
})
.await;

// We expect that this is still valid despite the fact that `PeerB` was not
// the first when sending `Seconded`
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_b && r == BENEFIT_VALID_STATEMENT => {}
);

handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};

futures::pin_mut!(test_fut);
futures::pin_mut!(bg);

executor::block_on(future::join(test_fut, bg));
}

fn make_session_info(validators: Vec<Pair>, groups: Vec<Vec<u32>>) -> SessionInfo {
let validator_groups: Vec<Vec<ValidatorIndex>> = groups
.iter()
Expand Down