Skip to content

Commit

Permalink
[TestLoop] Vastly simplify LoopEventHandler (#10888)
Browse files Browse the repository at this point in the history
This PR is based on #10884 .

Previously, a `LoopEventHandler` provides the ability to (1) initialize,
which emits some events before the test loop is run at all; (2) handle
an event; (3) decide whether an event can be dropped, which is used to
ignore periodic timers at the end of a test; (4) send another event of
the same type back into the test loop.

This PR removes (1), (3), (4), leaving `LoopEventHandler` with only the
ability to handle events; in other words, it is simply a function now.

Here are the use cases that needed to be tackled:
* (1), (3), (4) were used to support periodic timers. Periodic timers
were implemented by first emitting an initial event during (1), and then
later when handling that event, it used (4) to send another event to
trigger the next tick, and so on. When the test is dropped, (3) returns
true for any periodic timer event remaining in the event loop, so that
the test doesn't think there are unhandled events.
* Now, periodic timers are supported via two mechanisms: (a) by using
the production implementation based on `DelayedActionRunner`, which
periodic timers have already been refactored to use; see
`ShardsManager::periodically_resend_chunk_requests`; for (3), the
`DelayedActionRunner` that the TestLoop provides
(`TestLoopDelayedActionRunner`) will no longer send new events when the
test loop itself is shutting down; for production, the
`DelayedActionRunner` is implemented using the Actix context, which does
nothing if the actix system is shutting down, so these two behaviors are
consistent. (b) by scheduling a future that uses Clock::sleep. See
#10884 .
* (4) was used mostly for convenience, so that when creating a handler,
we do not need to pass in a `DelaySender` that can be used to send
another event back to the event loop. For example,
`route_shards_manager_network_messages` handled events of the type
`(usize, Event)`, and since (4) provided a way to send another `(usize,
Event)` back to the event loop, the handler just worked. However, this
came at a cost of extra complexity in implementing the test loop because
the event handler needed some way to have a `DelaySender<(usize,
Event)>` (in this example) to send the event, so (1) is needed to
support that (during initialization, we store the sender in a local
field of the event handler).
* Now, (4) is removed and replaced with the need to explicitly pass in
any `DelaySender`s needed (and also Clock, or the shutting down flag).
This is not really a big deal, because these are available easily via
`test.sender()` (maybe with a `.for_index(idx)` or `.narrow()` trail),
`test.clock()`, `test.shutting_down()`.
* Because of this explicitness, I've added helper functions
(`register_delayed_action_handler` and its `_for_index` variant) to
register handlers for the delayed actions case, because otherwise the
code looks too long and cryptic.
  • Loading branch information
robin-near authored Mar 29, 2024
1 parent ea09a10 commit d92c69d
Show file tree
Hide file tree
Showing 18 changed files with 592 additions and 697 deletions.
36 changes: 20 additions & 16 deletions chain/chunks/src/test/basic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
use std::collections::HashSet;

use crate::{
adapter::ShardsManagerRequestFromClient,
client::ShardsManagerResponse,
test_loop::{
forward_client_request_to_shards_manager, forward_network_request_to_shards_manager,
MockChainForShardsManager, MockChainForShardsManagerConfig,
},
test_utils::default_tip,
ShardsManager, CHUNK_REQUEST_RETRY,
};
use derive_enum_from_into::{EnumFrom, EnumTryInto};
use near_async::messaging::noop;
use near_async::test_loop::futures::TestLoopDelayedActionEvent;
use near_async::time;
use near_async::{
messaging::{CanSend, IntoSender},
Expand All @@ -18,20 +27,9 @@ use near_network::{
};
use near_primitives::types::{AccountId, BlockHeight};
use near_store::test_utils::create_test_store;
use std::collections::HashSet;
use tracing::log::info;

use crate::{
adapter::ShardsManagerRequestFromClient,
client::ShardsManagerResponse,
test_loop::{
forward_client_request_to_shards_manager, forward_network_request_to_shards_manager,
periodically_resend_chunk_requests, MockChainForShardsManager,
MockChainForShardsManagerConfig, ShardsManagerResendChunkRequests,
},
test_utils::default_tip,
ShardsManager, CHUNK_REQUEST_RETRY,
};

#[derive(derive_more::AsMut)]
struct TestData {
shards_manager: ShardsManager,
Expand Down Expand Up @@ -60,8 +58,8 @@ enum TestEvent {
NetworkToShardsManager(ShardsManagerRequestFromNetwork),
ShardsManagerToClient(ShardsManagerResponse),
ShardsManagerToNetwork(PeerManagerMessageRequest),
ShardsManagerResendRequests(ShardsManagerResendChunkRequests),
Adhoc(AdhocEvent<TestData>),
ShardsManagerDelayedActions(TestLoopDelayedActionEvent<ShardsManager>),
}

type ShardsManagerTestLoopBuilder = near_async::test_loop::TestLoopBuilder<TestEvent>;
Expand Down Expand Up @@ -182,8 +180,13 @@ fn test_chunk_forward() {
test.register_handler(capture_events::<PeerManagerMessageRequest>().widen());
test.register_handler(forward_client_request_to_shards_manager().widen());
test.register_handler(forward_network_request_to_shards_manager().widen());
test.register_handler(periodically_resend_chunk_requests(CHUNK_REQUEST_RETRY).widen());
test.register_handler(handle_adhoc_events::<TestData>().widen());
test.register_delayed_action_handler::<ShardsManager>();

test.data.shards_manager.periodically_resend_chunk_requests(
&mut test.sender().into_delayed_action_runner::<ShardsManager>(test.shutting_down()),
CHUNK_REQUEST_RETRY,
);

// We'll produce a single chunk whose next chunk producer is a chunk-only
// producer, so that we can test that the chunk is forwarded to the next
Expand Down Expand Up @@ -260,4 +263,5 @@ fn test_chunk_forward() {
}
}
assert!(seen_part_request);
test.shutdown_and_drain_remaining_events(time::Duration::seconds(1));
}
50 changes: 32 additions & 18 deletions chain/chunks/src/test/multi.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
use crate::{
adapter::ShardsManagerRequestFromClient,
client::ShardsManagerResponse,
test_loop::{
forward_client_request_to_shards_manager, forward_network_request_to_shards_manager,
route_shards_manager_network_messages, MockChainForShardsManager,
MockChainForShardsManagerConfig,
},
test_utils::default_tip,
ShardsManager, CHUNK_REQUEST_RETRY,
};
use derive_enum_from_into::{EnumFrom, EnumTryInto};
use near_async::test_loop::futures::TestLoopDelayedActionEvent;
use near_async::{
messaging::IntoSender,
test_loop::{
Expand All @@ -20,19 +32,6 @@ use near_primitives::{
};
use near_store::test_utils::create_test_store;

use crate::{
adapter::ShardsManagerRequestFromClient,
client::ShardsManagerResponse,
test_loop::{
forward_client_request_to_shards_manager, forward_network_request_to_shards_manager,
periodically_resend_chunk_requests, route_shards_manager_network_messages,
MockChainForShardsManager, MockChainForShardsManagerConfig,
ShardsManagerResendChunkRequests,
},
test_utils::default_tip,
ShardsManager, CHUNK_REQUEST_RETRY,
};

#[derive(derive_more::AsMut, derive_more::AsRef)]
struct TestData {
shards_manager: ShardsManager,
Expand All @@ -50,11 +49,11 @@ impl AsMut<TestData> for TestData {
#[derive(EnumTryInto, Debug, EnumFrom)]
enum TestEvent {
Adhoc(AdhocEvent<TestData>),
ShardsManagerDelayedActions(TestLoopDelayedActionEvent<ShardsManager>),
ClientToShardsManager(ShardsManagerRequestFromClient),
NetworkToShardsManager(ShardsManagerRequestFromNetwork),
ShardsManagerToClient(ShardsManagerResponse),
OutboundNetwork(PeerManagerMessageRequest),
ShardsManagerResendChunkRequests(ShardsManagerResendChunkRequests),
}

type ShardsManagerTestLoop = near_async::test_loop::TestLoop<Vec<TestData>, (usize, TestEvent)>;
Expand Down Expand Up @@ -106,13 +105,24 @@ fn basic_setup(config: BasicSetupConfig) -> ShardsManagerTestLoop {
let mut test = builder.build(data);
for idx in 0..test.data.len() {
test.register_handler(handle_adhoc_events::<TestData>().widen().for_index(idx));
test.register_delayed_action_handler_for_index::<ShardsManager>(idx);
test.register_handler(forward_client_request_to_shards_manager().widen().for_index(idx));
test.register_handler(forward_network_request_to_shards_manager().widen().for_index(idx));
test.register_handler(capture_events::<ShardsManagerResponse>().widen().for_index(idx));
test.register_handler(route_shards_manager_network_messages(NETWORK_DELAY));
test.register_handler(
periodically_resend_chunk_requests(CHUNK_REQUEST_RETRY).widen().for_index(idx),
);
test.register_handler(route_shards_manager_network_messages(
test.sender(),
test.clock(),
NETWORK_DELAY,
));

let sender = test.sender().for_index(idx);
let shutting_down = test.shutting_down();
test.sender().for_index(idx).send_adhoc_event("start_shards_manager", |data| {
data.shards_manager.periodically_resend_chunk_requests(
&mut sender.into_delayed_action_runner(shutting_down),
CHUNK_REQUEST_RETRY,
);
})
}
test
}
Expand Down Expand Up @@ -175,6 +185,8 @@ fn test_distribute_chunk_basic() {
_ => panic!("Unexpected event"),
}
}

test.shutdown_and_drain_remaining_events(time::Duration::seconds(1));
}

/// Tests that when we have some block producers (validators) in the network,
Expand Down Expand Up @@ -237,6 +249,7 @@ fn test_distribute_chunk_track_all_shards() {
_ => panic!("Unexpected event"),
}
}
test.shutdown_and_drain_remaining_events(time::Duration::seconds(1));
}

/// Tests that when the network has some block producers and also some chunk-
Expand Down Expand Up @@ -348,4 +361,5 @@ fn test_distribute_chunk_with_chunk_only_producers() {
});
}
test.run_instant();
test.shutdown_and_drain_remaining_events(time::Duration::seconds(1));
}
144 changes: 68 additions & 76 deletions chain/chunks/src/test_loop.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use std::{collections::HashMap, sync::Arc};

use crate::{
adapter::ShardsManagerRequestFromClient,
logic::{cares_about_shard_this_or_next_epoch, make_outgoing_receipts_proofs},
test_utils::{default_tip, tip},
ShardsManager,
};
use near_async::test_loop::delay_sender::DelaySender;
use near_async::time;
use near_async::time::Clock;
use near_async::{
messaging::Sender,
test_loop::event_handler::{interval, LoopEventHandler, LoopHandlerContext, TryIntoOrSelf},
test_loop::event_handler::{LoopEventHandler, TryIntoOrSelf},
};
use near_chain::{types::Tip, Chain};
use near_epoch_manager::{
Expand All @@ -28,13 +34,7 @@ use near_primitives::{
version::PROTOCOL_VERSION,
};
use near_store::Store;

use crate::{
adapter::ShardsManagerRequestFromClient,
logic::{cares_about_shard_this_or_next_epoch, make_outgoing_receipts_proofs},
test_utils::{default_tip, tip},
ShardsManager,
};
use std::{collections::HashMap, sync::Arc};

pub fn forward_client_request_to_shards_manager(
) -> LoopEventHandler<ShardsManager, ShardsManagerRequestFromClient> {
Expand All @@ -61,99 +61,91 @@ pub fn route_shards_manager_network_messages<
+ From<PeerManagerMessageRequest>
+ From<ShardsManagerRequestFromNetwork>,
>(
sender: DelaySender<(usize, Event)>,
clock: Clock,
network_delay: time::Duration,
) -> LoopEventHandler<Data, (usize, Event)> {
let mut route_back_lookup: HashMap<CryptoHash, usize> = HashMap::new();
let mut next_hash: u64 = 0;
LoopEventHandler::new(
move |event: (usize, Event),
data: &mut Data,
context: &LoopHandlerContext<(usize, Event)>| {
let (idx, event) = event;
let message = event.try_into_or_self().map_err(|e| (idx, e.into()))?;
match message {
PeerManagerMessageRequest::NetworkRequests(request) => {
match request {
NetworkRequests::PartialEncodedChunkRequest { target, request, .. } => {
let target_idx = data.index_for_account(&target.account_id.unwrap());
let route_back = CryptoHash::hash_borsh(next_hash);
route_back_lookup.insert(route_back, idx);
next_hash += 1;
context.sender.send_with_delay(
LoopEventHandler::new(move |event: (usize, Event), data: &mut Data| {
let (idx, event) = event;
let message = event.try_into_or_self().map_err(|e| (idx, e.into()))?;
match message {
PeerManagerMessageRequest::NetworkRequests(request) => {
match request {
NetworkRequests::PartialEncodedChunkRequest { target, request, .. } => {
let target_idx = data.index_for_account(&target.account_id.unwrap());
let route_back = CryptoHash::hash_borsh(next_hash);
route_back_lookup.insert(route_back, idx);
next_hash += 1;
sender.send_with_delay(
(target_idx,
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkRequest {
partial_encoded_chunk_request: request,
route_back,
}.into()),
network_delay,
);
Ok(())
}
NetworkRequests::PartialEncodedChunkResponse { route_back, response } => {
let target_idx =
*route_back_lookup.get(&route_back).expect("Route back not found");
context.sender.send_with_delay(
Ok(())
}
NetworkRequests::PartialEncodedChunkResponse { route_back, response } => {
let target_idx =
*route_back_lookup.get(&route_back).expect("Route back not found");
sender.send_with_delay(
(target_idx,
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkResponse {
partial_encoded_chunk_response: response,
received_time: context.clock.now().into(), // TODO: use clock
received_time: clock.now().into(), // TODO: use clock
}.into()),
network_delay,
);
Ok(())
}
NetworkRequests::PartialEncodedChunkMessage {
account_id,
partial_encoded_chunk,
} => {
let target_idx = data.index_for_account(&account_id);
context.sender.send_with_delay(
(
target_idx,
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk(
partial_encoded_chunk.into(),
)
.into(),
),
network_delay,
);
Ok(())
}
NetworkRequests::PartialEncodedChunkForward { account_id, forward } => {
let target_idx = data.index_for_account(&account_id);
context.sender.send_with_delay(
(target_idx,
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward(
forward,
).into()),
Ok(())
}
NetworkRequests::PartialEncodedChunkMessage {
account_id,
partial_encoded_chunk,
} => {
let target_idx = data.index_for_account(&account_id);
sender.send_with_delay(
(
target_idx,
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk(
partial_encoded_chunk.into(),
)
.into(),
),
network_delay,
);
Ok(())
}
NetworkRequests::PartialEncodedChunkForward { account_id, forward } => {
let target_idx = data.index_for_account(&account_id);
sender.send_with_delay(
(
target_idx,
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward(
forward,
)
.into(),
),
network_delay,
);
Ok(())
}
other_message => Err((
idx,
PeerManagerMessageRequest::NetworkRequests(other_message).into(),
)),
Ok(())
}
other_message => {
Err((idx, PeerManagerMessageRequest::NetworkRequests(other_message).into()))
}
}
message => Err((idx, message.into())),
}
},
)
message => Err((idx, message.into())),
}
})
}

// NOTE: this is no longer needed for TestLoop, but some other non-TestLoop tests depend on it.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ShardsManagerResendChunkRequests;

/// Periodically call resend_chunk_requests.
pub fn periodically_resend_chunk_requests(
every: time::Duration,
) -> LoopEventHandler<ShardsManager, ShardsManagerResendChunkRequests> {
interval(every, ShardsManagerResendChunkRequests, |data: &mut ShardsManager| {
data.resend_chunk_requests()
})
}

/// A simple implementation of the chain side that interacts with
/// ShardsManager.
pub struct MockChainForShardsManager {
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/test_utils/client_actions_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use near_network::client::ClientSenderForNetworkMessage;

pub fn forward_client_messages_from_network_to_client_actions(
) -> LoopEventHandler<ClientActions, ClientSenderForNetworkMessage> {
LoopEventHandler::new(|msg, client_actions: &mut ClientActions, _| {
LoopEventHandler::new(|msg, client_actions: &mut ClientActions| {
match msg {
ClientSenderForNetworkMessage::_state_response(msg) => {
(msg.callback)(Ok(client_actions.handle(msg.message)));
Expand Down
Loading

0 comments on commit d92c69d

Please sign in to comment.