-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Rewrite network protocol/service to use channels #1340
Rewrite network protocol/service to use channels #1340
Conversation
It looks like @gterzian signed our Contributor License Agreement. 👍 Many thanks, Parity Technologies CLA Bot |
03c3504
to
3f82e52
Compare
0f956fd
to
7f434bc
Compare
d0c9630
to
f9f4198
Compare
Ok this one is now ready for review. @tomaka ? I've added one commit that introduces a basic back pressure mechanism between the libp2p and protocol(since the the protocol operations are now in their own thread, and the events from libp2p could in theory otherwise pile-up in the unbounded channel used to communicate with protocol). Please note that in a few files with "lots" of changes, and because my editor was set to indent with spaces, I ended up running |
029c255
to
d153086
Compare
After reading the diff, I think I misunderstood your description. I don't really see the point of exposing a channel in the API of |
@tomaka I can re-introduce the We could also move the equivalent of Would that be better?
So
The nice thing is that since the This use of a stream of event is really related to |
What I'm suggesting is to instead move
For example if Sure, internally, libp2p uses sub-tasks so that the network connections are actually asynchronous. And internally libp2p is quite similar to an actor model. However having a simple synchronous API exposed is a very nice property in my opinion. On the other hand |
In this proposal, you get exactly the same behavior from the perspective of When The question is whether This particular case actually requires the "node id" of a disconnected node to be communicated back to
If we're using threads, perhaps we might as well go for as much parallelism as the integrity of the system can tolerate, and only do things synchronously when necessary? If component absolutely need to do a state transition in a "sync" way, that can be dealt with with a reply channel send on the original message(example). Also, is the current "synchronous API" really that simple? What about the various concurrent components competing to acquire locks?
I actually did handle the messages at the place you suggest in an earlier version of this PR, and I moved the message handling to
|
core/network/src/service.rs
Outdated
self.handler.on_block_imported(&mut NetSyncIo::new(&self.network, self.protocol_id), hash, header) | ||
let _ = self | ||
.protocol_sender | ||
.send(ProtocolMsg::BlockImported(hash, header.clone())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if these functions should be changed to take the header by-value explicitly so the clone isn't hidden.
core/network/src/service.rs
Outdated
impl NetworkChan { | ||
/// Create a new network chan. | ||
pub fn new(sender: Sender<NetworkMsg>, task_notify: Arc<AtomicTask>) -> Self { | ||
Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: using Self
like this is weird.
protocol.on_clogged_peer(&mut net_sync, node_index, | ||
messages.iter().map(|d| d.as_ref())); | ||
debug!(target: "sync", "{} clogging messages:", messages.len()); | ||
for msg_bytes in messages.iter().take(5) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why take 5?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is from master at
substrate/core/network/src/protocol.rs
Line 436 in d0f824f
for msg_bytes in clogging_messages.take(5) { |
@tomaka would know the reason for it...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't put this. I imagine someone did because it was spamming the logs too much. It's a bit stupid as the whole point of dumping all the messages was to see the frequency's of each one.
7ed111d
to
9f63fbe
Compare
@rphmeier Thanks for the review, I think all your comments have been addressed... |
FWIW I tested this on polkadot by syncing the alexander chain from scratch, worked fine. 👍 |
@andresilva thank you! |
* rewrite network protocol/service to use channels * remove use of unwrap * re-introduce with_spec * remove unnecessary mut * remove unused param * improve with_spec, add with_gossip * rename job to task * style: re-add comma * remove extra string allocs * rename use of channel * turn TODO into FIXME * remove mut in match * remove Self in new * pass headers by value to network service * remove network sender from service * remove TODO * better expect * rationalize use of network sender in ondemand
Fix #1326
Here is a work in progress/proof of concept for a slightly different approach to concurrency involving running components in their own threads, based on their own "event-loop", mainly consisting of receiving and handling messages sequentially.
The main benefit is that, since components are running in their own dedicated thread, there is no need for locking of any kind. Also, you're essentially building your own "runtime", and can enable components to run independently of each other, logically "in parallel".
Some examples:
Protocol
is now running in a thread, handlingProtocolMsg
, including periodicsTick
andProgagateExtrinsincs
. Protocol itself isn't shared, or has it's methods called, outside of it's own thread. The result is this:Service
now contains senders to bothProtocol
andNetworkService
(libp2p), and instead of calling methods on those, it sends messages to be handled on their respective event-loops.Service
is still polled by the tokio runtime. However it isn't shared with other components via theSyncIo
anymore. Instead, it receives messages fromProtocol
(and others) via a channel, and messages are handled insidepoll
. So while this still seems to require lockingService
before doing apoll
, that seems to be the only case left(previously it would be locked at each call toSyncIo
). The handling ofServiceEvent
, enqueud on a stream by the network service, is still done on the tokio runtime as well, however it now only consists of sending messages toProtocol
, and, in the case of reporting a peer, back to the network service. This could be seen as a "bridge" between the tokio runtime/libp2p and the protocol.NetworkLink
, shared with the import queue, is now also just a wrapper around a network and protocol sender. So instead of calling methods, which required locking, it now sends messages, to be handled on the respective event-loops of protocol or the network service.As a general note, this not only removes locks(which make the logic of
Protocol
really just single-threaded sequential stuff), it also removes dependencies on the various "components" in your system, in various ways:Service
itself has nothing to do withSpecialization
orExHashT
, these are related toProtocol
, yet because service contained a protocol, these needed to be included in the signature.remove_reserved_peer
previously required to lock the network service(meaning it couldn't bepoll
ed in the meantime), and the method call on protocol also required various locks, some of which could block the import-queue, and others would block the network service(each method call onSyncIo
would lock the network service, meaning it couldn't be polled in the meantime).When is
on_peer_disconnected
called now, Since it's not done inremove_reserved_peer
anymore?When the network service receives the
RemoveReservedPeer
message, it will enqueue a newDisconnectNode
event containing info about nodes to be disconnected.This event will later be handled, and result in a message send to protocol
This results in protocol removing nodes from it's own state, and network removing reserved peers, to happen completely in non-blocking fashion of each other. This setup could also further free-up the import-queue, since the shared dependency/lock on
ChainSync
is be removed.Finally, if you absolutely must block one component, while waiting for an "answer" from another one, this can be implemented like this:
The tests are going to have to be rewritten for them to pass, however I first wanted to get your point of view on this approach.
Note that when I mention "parallelism", I'm less thinking in terms of performance, and more in terms of logic(obviously, the actual parallelism will be limited by the physical reality of the machine the code is running on). So it's less about boosting performance, more about ensuring components are executing independently. It might bring some performance bonus as well, but that's not the goal.
cc @andresilva @tomusdrw This is a concrete example of some of the stuff I mentioned in relation to "actors"...